Merge branch 'master' into feature/seq_no
* master: Set vm.max_map_count on systemd package install [TEST] reduce the number of snapshotted shards to 1 in testSnapshotSucceedsAfterSnapshotFailure() so that we are more likely to trigger I/O exceptions on writing the control files during the finalize phase of snapshotting (with the aim of triggering an I/O failure when writing pending-index-*). Add documentation for Logger with Transport Client Enable appender exceptions in UpdateSettingsIT [TEST] remove AwaitsFix from testSnapshotSucceedsAfterSnapshotFailure, turns out the issue is specific to Java 9 v143 Cleanup formatting in UpdateSettingsIT.java [TEST] mute the testSnapshotSucceedsAfterSnapshotFailure() test until its clear what is going wrong. Mark SearchQueryIT test as awaits fix Makes snapshot throttling test go much faster (#21485) Breaking changes docs for template index_patterns [TEST] adds randomness between atomic and non-atomic move operations in MockRepository Cache successful shard deletion checks (#21438) Task cancellation command should wait for all child nodes to receive cancellation request before returning
This commit is contained in:
commit
1ea69b1a80
|
@ -840,7 +840,6 @@
|
|||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoveryStateTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]recovery[/\\]RecoveryStatusTests.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]settings[/\\]UpdateNumberOfReplicasIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]settings[/\\]UpdateSettingsIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]state[/\\]CloseIndexDisableCloseAllIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]state[/\\]OpenCloseIndexIT.java" checks="LineLength" />
|
||||
<suppress files="core[/\\]src[/\\]test[/\\]java[/\\]org[/\\]elasticsearch[/\\]indices[/\\]state[/\\]RareClusterStateIT.java" checks="LineLength" />
|
||||
|
|
|
@ -6,6 +6,7 @@ spatial4j = 0.6
|
|||
jts = 1.13
|
||||
jackson = 2.8.1
|
||||
snakeyaml = 1.15
|
||||
# When updating log4j, please update also docs/java-api/index.asciidoc
|
||||
log4j = 2.7
|
||||
slf4j = 1.6.2
|
||||
jna = 4.2.2
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.elasticsearch.common.inject.Inject;
|
|||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.common.util.concurrent.AtomicArray;
|
||||
import org.elasticsearch.tasks.CancellableTask;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.tasks.TaskInfo;
|
||||
|
@ -46,6 +47,7 @@ import org.elasticsearch.transport.TransportResponse;
|
|||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
@ -118,12 +120,44 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
Set<String> childNodes = taskManager.cancel(cancellableTask, request.getReason(), banLock::onTaskFinished);
|
||||
if (childNodes != null) {
|
||||
if (childNodes.isEmpty()) {
|
||||
// The task has no child tasks, so we can return immediately
|
||||
logger.trace("cancelling task {} with no children", cancellableTask.getId());
|
||||
listener.onResponse(cancellableTask.taskInfo(clusterService.localNode().getId(), false));
|
||||
} else {
|
||||
// The task has some child tasks, we need to wait for until ban is set on all nodes
|
||||
logger.trace("cancelling task {} with children on nodes [{}]", cancellableTask.getId(), childNodes);
|
||||
setBanOnNodes(request.getReason(), cancellableTask, childNodes, banLock);
|
||||
listener.onResponse(cancellableTask.taskInfo(clusterService.localNode().getId(), false));
|
||||
String nodeId = clusterService.localNode().getId();
|
||||
AtomicInteger responses = new AtomicInteger(childNodes.size());
|
||||
List<Exception> failures = new ArrayList<>();
|
||||
setBanOnNodes(request.getReason(), cancellableTask, childNodes, new ActionListener<Void>() {
|
||||
@Override
|
||||
public void onResponse(Void aVoid) {
|
||||
processResponse();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(Exception e) {
|
||||
synchronized (failures) {
|
||||
failures.add(e);
|
||||
}
|
||||
processResponse();
|
||||
}
|
||||
|
||||
private void processResponse() {
|
||||
banLock.onBanSet();
|
||||
if (responses.decrementAndGet() == 0) {
|
||||
if (failures.isEmpty() == false) {
|
||||
IllegalStateException exception = new IllegalStateException("failed to cancel children of the task [" +
|
||||
cancellableTask.getId() + "]");
|
||||
failures.forEach(exception::addSuppressed);
|
||||
listener.onFailure(exception);
|
||||
} else {
|
||||
listener.onResponse(cancellableTask.taskInfo(nodeId, false));
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
}
|
||||
} else {
|
||||
logger.trace("task {} is already cancelled", cancellableTask.getId());
|
||||
|
@ -136,10 +170,10 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
return true;
|
||||
}
|
||||
|
||||
private void setBanOnNodes(String reason, CancellableTask task, Set<String> nodes, BanLock banLock) {
|
||||
private void setBanOnNodes(String reason, CancellableTask task, Set<String> nodes, ActionListener<Void> listener) {
|
||||
sendSetBanRequest(nodes,
|
||||
BanParentTaskRequest.createSetBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId()), reason),
|
||||
banLock);
|
||||
listener);
|
||||
}
|
||||
|
||||
private void removeBanOnNodes(CancellableTask task, Set<String> nodes) {
|
||||
|
@ -147,28 +181,29 @@ public class TransportCancelTasksAction extends TransportTasksAction<Cancellable
|
|||
BanParentTaskRequest.createRemoveBanParentTaskRequest(new TaskId(clusterService.localNode().getId(), task.getId())));
|
||||
}
|
||||
|
||||
private void sendSetBanRequest(Set<String> nodes, BanParentTaskRequest request, BanLock banLock) {
|
||||
private void sendSetBanRequest(Set<String> nodes, BanParentTaskRequest request, ActionListener<Void> listener) {
|
||||
ClusterState clusterState = clusterService.state();
|
||||
for (String node : nodes) {
|
||||
DiscoveryNode discoveryNode = clusterState.getNodes().get(node);
|
||||
if (discoveryNode != null) {
|
||||
// Check if node still in the cluster
|
||||
logger.debug("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]", request.parentTaskId, node,
|
||||
logger.trace("Sending ban for tasks with the parent [{}] to the node [{}], ban [{}]", request.parentTaskId, node,
|
||||
request.ban);
|
||||
transportService.sendRequest(discoveryNode, BAN_PARENT_ACTION_NAME, request,
|
||||
new EmptyTransportResponseHandler(ThreadPool.Names.SAME) {
|
||||
@Override
|
||||
public void handleResponse(TransportResponse.Empty response) {
|
||||
banLock.onBanSet();
|
||||
listener.onResponse(null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void handleException(TransportException exp) {
|
||||
banLock.onBanSet();
|
||||
logger.warn("Cannot send ban for tasks with the parent [{}] to the node [{}]", request.parentTaskId, node);
|
||||
listener.onFailure(exp);
|
||||
}
|
||||
});
|
||||
} else {
|
||||
banLock.onBanSet();
|
||||
listener.onResponse(null);
|
||||
logger.debug("Cannot send ban for tasks with the parent [{}] to the node [{}] - the node no longer in the cluster",
|
||||
request.parentTaskId, node);
|
||||
}
|
||||
|
|
|
@ -737,8 +737,9 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
final IndexMetaData metaData = clusterState.getMetaData().indices().get(shardId.getIndexName());
|
||||
|
||||
final IndexSettings indexSettings = buildIndexSettings(metaData);
|
||||
if (canDeleteShardContent(shardId, indexSettings) == false) {
|
||||
throw new IllegalStateException("Can't delete shard " + shardId);
|
||||
ShardDeletionCheckResult shardDeletionCheckResult = canDeleteShardContent(shardId, indexSettings);
|
||||
if (shardDeletionCheckResult != ShardDeletionCheckResult.FOLDER_FOUND_CAN_DELETE) {
|
||||
throw new IllegalStateException("Can't delete shard " + shardId + " (cause: " + shardDeletionCheckResult + ")");
|
||||
}
|
||||
nodeEnv.deleteShardDirectorySafe(shardId, indexSettings);
|
||||
logger.debug("{} deleted shard reason [{}]", shardId, reason);
|
||||
|
@ -819,39 +820,50 @@ public class IndicesService extends AbstractLifecycleComponent
|
|||
}
|
||||
|
||||
/**
|
||||
* Returns <code>true</code> iff the shards content for the given shard can be deleted.
|
||||
* This method will return <code>false</code> if:
|
||||
* <ul>
|
||||
* <li>if the shard is still allocated / active on this node</li>
|
||||
* <li>if for instance if the shard is located on shared and should not be deleted</li>
|
||||
* <li>if the shards data locations do not exists</li>
|
||||
* </ul>
|
||||
* result type returned by {@link #canDeleteShardContent signaling different reasons why a shard can / cannot be deleted}
|
||||
*/
|
||||
public enum ShardDeletionCheckResult {
|
||||
FOLDER_FOUND_CAN_DELETE, // shard data exists and can be deleted
|
||||
STILL_ALLOCATED, // the shard is still allocated / active on this node
|
||||
NO_FOLDER_FOUND, // the shards data locations do not exist
|
||||
SHARED_FILE_SYSTEM, // the shard is located on shared and should not be deleted
|
||||
NO_LOCAL_STORAGE // node does not have local storage (see DiscoveryNode.nodeRequiresLocalStorage)
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns <code>ShardDeletionCheckResult</code> signaling whether the shards content for the given shard can be deleted.
|
||||
*
|
||||
* @param shardId the shard to delete.
|
||||
* @param indexSettings the shards's relevant {@link IndexSettings}. This is required to access the indexes settings etc.
|
||||
*/
|
||||
public boolean canDeleteShardContent(ShardId shardId, IndexSettings indexSettings) {
|
||||
public ShardDeletionCheckResult canDeleteShardContent(ShardId shardId, IndexSettings indexSettings) {
|
||||
assert shardId.getIndex().equals(indexSettings.getIndex());
|
||||
final IndexService indexService = indexService(shardId.getIndex());
|
||||
if (indexSettings.isOnSharedFilesystem() == false) {
|
||||
if (nodeEnv.hasNodeFile()) {
|
||||
final boolean isAllocated = indexService != null && indexService.hasShard(shardId.id());
|
||||
if (isAllocated) {
|
||||
return false; // we are allocated - can't delete the shard
|
||||
return ShardDeletionCheckResult.STILL_ALLOCATED; // we are allocated - can't delete the shard
|
||||
} else if (indexSettings.hasCustomDataPath()) {
|
||||
// lets see if it's on a custom path (return false if the shared doesn't exist)
|
||||
// we don't need to delete anything that is not there
|
||||
return Files.exists(nodeEnv.resolveCustomLocation(indexSettings, shardId));
|
||||
return Files.exists(nodeEnv.resolveCustomLocation(indexSettings, shardId)) ?
|
||||
ShardDeletionCheckResult.FOLDER_FOUND_CAN_DELETE :
|
||||
ShardDeletionCheckResult.NO_FOLDER_FOUND;
|
||||
} else {
|
||||
// lets see if it's path is available (return false if the shared doesn't exist)
|
||||
// we don't need to delete anything that is not there
|
||||
return FileSystemUtils.exists(nodeEnv.availableShardPaths(shardId));
|
||||
return FileSystemUtils.exists(nodeEnv.availableShardPaths(shardId)) ?
|
||||
ShardDeletionCheckResult.FOLDER_FOUND_CAN_DELETE :
|
||||
ShardDeletionCheckResult.NO_FOLDER_FOUND;
|
||||
}
|
||||
}
|
||||
} else {
|
||||
return ShardDeletionCheckResult.NO_LOCAL_STORAGE;
|
||||
}
|
||||
} else {
|
||||
logger.trace("{} skipping shard directory deletion due to shadow replicas", shardId);
|
||||
return ShardDeletionCheckResult.SHARED_FILE_SYSTEM;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private IndexSettings buildIndexSettings(IndexMetaData metaData) {
|
||||
|
|
|
@ -31,6 +31,8 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
|
|||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingNode;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.cluster.service.ClusterServiceState;
|
||||
|
@ -63,7 +65,10 @@ import java.io.Closeable;
|
|||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
|
@ -80,6 +85,9 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
|||
private final TransportService transportService;
|
||||
private final ThreadPool threadPool;
|
||||
|
||||
// Cache successful shard deletion checks to prevent unnecessary file system lookups
|
||||
private final Set<ShardId> folderNotFoundCache = new HashSet<>();
|
||||
|
||||
private TimeValue deleteShardTimeout;
|
||||
|
||||
@Inject
|
||||
|
@ -115,11 +123,31 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
|||
return;
|
||||
}
|
||||
|
||||
for (IndexRoutingTable indexRoutingTable : event.state().routingTable()) {
|
||||
RoutingTable routingTable = event.state().routingTable();
|
||||
|
||||
// remove entries from cache that don't exist in the routing table anymore (either closed or deleted indices)
|
||||
// - removing shard data of deleted indices is handled by IndicesClusterStateService
|
||||
// - closed indices don't need to be removed from the cache but we do it anyway for code simplicity
|
||||
for (Iterator<ShardId> it = folderNotFoundCache.iterator(); it.hasNext(); ) {
|
||||
ShardId shardId = it.next();
|
||||
if (routingTable.hasIndex(shardId.getIndex()) == false) {
|
||||
it.remove();
|
||||
}
|
||||
}
|
||||
// remove entries from cache which are allocated to this node
|
||||
final String localNodeId = event.state().nodes().getLocalNodeId();
|
||||
RoutingNode localRoutingNode = event.state().getRoutingNodes().node(localNodeId);
|
||||
if (localRoutingNode != null) {
|
||||
for (ShardRouting routing : localRoutingNode) {
|
||||
folderNotFoundCache.remove(routing.shardId());
|
||||
}
|
||||
}
|
||||
|
||||
for (IndexRoutingTable indexRoutingTable : routingTable) {
|
||||
// Note, closed indices will not have any routing information, so won't be deleted
|
||||
for (IndexShardRoutingTable indexShardRoutingTable : indexRoutingTable) {
|
||||
if (shardCanBeDeleted(event.state(), indexShardRoutingTable)) {
|
||||
ShardId shardId = indexShardRoutingTable.shardId();
|
||||
ShardId shardId = indexShardRoutingTable.shardId();
|
||||
if (folderNotFoundCache.contains(shardId) == false && shardCanBeDeleted(localNodeId, indexShardRoutingTable)) {
|
||||
IndexService indexService = indicesService.indexService(indexRoutingTable.getIndex());
|
||||
final IndexSettings indexSettings;
|
||||
if (indexService == null) {
|
||||
|
@ -128,15 +156,33 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
|||
} else {
|
||||
indexSettings = indexService.getIndexSettings();
|
||||
}
|
||||
if (indicesService.canDeleteShardContent(shardId, indexSettings)) {
|
||||
deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable);
|
||||
IndicesService.ShardDeletionCheckResult shardDeletionCheckResult = indicesService.canDeleteShardContent(shardId, indexSettings);
|
||||
switch (shardDeletionCheckResult) {
|
||||
case FOLDER_FOUND_CAN_DELETE:
|
||||
deleteShardIfExistElseWhere(event.state(), indexShardRoutingTable);
|
||||
break;
|
||||
case NO_FOLDER_FOUND:
|
||||
folderNotFoundCache.add(shardId);
|
||||
break;
|
||||
case NO_LOCAL_STORAGE:
|
||||
assert false : "shard deletion only runs on data nodes which always have local storage";
|
||||
// nothing to do
|
||||
break;
|
||||
case STILL_ALLOCATED:
|
||||
// nothing to do
|
||||
break;
|
||||
case SHARED_FILE_SYSTEM:
|
||||
// nothing to do
|
||||
break;
|
||||
default:
|
||||
assert false : "unknown shard deletion check result: " + shardDeletionCheckResult;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
boolean shardCanBeDeleted(ClusterState state, IndexShardRoutingTable indexShardRoutingTable) {
|
||||
static boolean shardCanBeDeleted(String localNodeId, IndexShardRoutingTable indexShardRoutingTable) {
|
||||
// a shard can be deleted if all its copies are active, and its not allocated on this node
|
||||
if (indexShardRoutingTable.size() == 0) {
|
||||
// should not really happen, there should always be at least 1 (primary) shard in a
|
||||
|
@ -146,27 +192,12 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
|||
|
||||
for (ShardRouting shardRouting : indexShardRoutingTable) {
|
||||
// be conservative here, check on started, not even active
|
||||
if (!shardRouting.started()) {
|
||||
if (shardRouting.started() == false) {
|
||||
return false;
|
||||
}
|
||||
|
||||
// if the allocated or relocation node id doesn't exists in the cluster state it may be a stale node,
|
||||
// make sure we don't do anything with this until the routing table has properly been rerouted to reflect
|
||||
// the fact that the node does not exists
|
||||
DiscoveryNode node = state.nodes().get(shardRouting.currentNodeId());
|
||||
if (node == null) {
|
||||
return false;
|
||||
}
|
||||
if (shardRouting.relocatingNodeId() != null) {
|
||||
node = state.nodes().get(shardRouting.relocatingNodeId());
|
||||
if (node == null) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// check if shard is active on the current node or is getting relocated to the our node
|
||||
String localNodeId = state.getNodes().getLocalNode().getId();
|
||||
if (localNodeId.equals(shardRouting.currentNodeId()) || localNodeId.equals(shardRouting.relocatingNodeId())) {
|
||||
// check if shard is active on the current node
|
||||
if (localNodeId.equals(shardRouting.currentNodeId())) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
@ -179,16 +210,9 @@ public class IndicesStore extends AbstractComponent implements ClusterStateListe
|
|||
String indexUUID = indexShardRoutingTable.shardId().getIndex().getUUID();
|
||||
ClusterName clusterName = state.getClusterName();
|
||||
for (ShardRouting shardRouting : indexShardRoutingTable) {
|
||||
// Node can't be null, because otherwise shardCanBeDeleted() would have returned false
|
||||
assert shardRouting.started() : "expected started shard but was " + shardRouting;
|
||||
DiscoveryNode currentNode = state.nodes().get(shardRouting.currentNodeId());
|
||||
assert currentNode != null;
|
||||
|
||||
requests.add(new Tuple<>(currentNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId(), deleteShardTimeout)));
|
||||
if (shardRouting.relocatingNodeId() != null) {
|
||||
DiscoveryNode relocatingNode = state.nodes().get(shardRouting.relocatingNodeId());
|
||||
assert relocatingNode != null;
|
||||
requests.add(new Tuple<>(relocatingNode, new ShardActiveRequest(clusterName, indexUUID, shardRouting.shardId(), deleteShardTimeout)));
|
||||
}
|
||||
}
|
||||
|
||||
ShardActiveResponseHandler responseHandler = new ShardActiveResponseHandler(indexShardRoutingTable.shardId(), state.getVersion(),
|
||||
|
|
|
@ -39,6 +39,7 @@ import org.elasticsearch.index.IndexService;
|
|||
import org.elasticsearch.index.IndexSettings;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.index.shard.ShardPath;
|
||||
import org.elasticsearch.indices.IndicesService.ShardDeletionCheckResult;
|
||||
import org.elasticsearch.test.ESSingleNodeTestCase;
|
||||
import org.elasticsearch.test.IndexSettingsModule;
|
||||
|
||||
|
@ -92,16 +93,19 @@ public class IndicesServiceTests extends ESSingleNodeTestCase {
|
|||
1).build();
|
||||
IndexSettings indexSettings = IndexSettingsModule.newIndexSettings("test", meta.getSettings());
|
||||
ShardId shardId = new ShardId(meta.getIndex(), 0);
|
||||
assertFalse("no shard location", indicesService.canDeleteShardContent(shardId, indexSettings));
|
||||
assertEquals("no shard location", indicesService.canDeleteShardContent(shardId, indexSettings),
|
||||
ShardDeletionCheckResult.NO_FOLDER_FOUND);
|
||||
IndexService test = createIndex("test");
|
||||
shardId = new ShardId(test.index(), 0);
|
||||
assertTrue(test.hasShard(0));
|
||||
assertFalse("shard is allocated", indicesService.canDeleteShardContent(shardId, test.getIndexSettings()));
|
||||
assertEquals("shard is allocated", indicesService.canDeleteShardContent(shardId, test.getIndexSettings()),
|
||||
ShardDeletionCheckResult.STILL_ALLOCATED);
|
||||
test.removeShard(0, "boom");
|
||||
assertTrue("shard is removed", indicesService.canDeleteShardContent(shardId, test.getIndexSettings()));
|
||||
assertEquals("shard is removed", indicesService.canDeleteShardContent(shardId, test.getIndexSettings()),
|
||||
ShardDeletionCheckResult.FOLDER_FOUND_CAN_DELETE);
|
||||
ShardId notAllocated = new ShardId(test.index(), 100);
|
||||
assertFalse("shard that was never on this node should NOT be deletable",
|
||||
indicesService.canDeleteShardContent(notAllocated, test.getIndexSettings()));
|
||||
assertEquals("shard that was never on this node should NOT be deletable",
|
||||
indicesService.canDeleteShardContent(notAllocated, test.getIndexSettings()), ShardDeletionCheckResult.NO_FOLDER_FOUND);
|
||||
}
|
||||
|
||||
public void testDeleteIndexStore() throws Exception {
|
||||
|
|
|
@ -38,9 +38,9 @@ import org.elasticsearch.common.settings.Setting;
|
|||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexModule;
|
||||
import org.elasticsearch.index.IndexService;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.MergePolicyConfig;
|
||||
import org.elasticsearch.index.MergeSchedulerConfig;
|
||||
import org.elasticsearch.index.engine.VersionConflictEngineException;
|
||||
import org.elasticsearch.index.store.IndexStore;
|
||||
import org.elasticsearch.index.store.Store;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
|
@ -64,15 +64,16 @@ import static org.hamcrest.Matchers.nullValue;
|
|||
|
||||
public class UpdateSettingsIT extends ESIntegTestCase {
|
||||
|
||||
|
||||
public void testInvalidDynamicUpdate() {
|
||||
createIndex("test");
|
||||
IllegalArgumentException exception = expectThrows(IllegalArgumentException.class, () ->
|
||||
client().admin().indices().prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.dummy", "boom")
|
||||
)
|
||||
.execute().actionGet());
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder().put("index.dummy", "boom"))
|
||||
.execute()
|
||||
.actionGet());
|
||||
assertEquals(exception.getCause().getMessage(), "this setting goes boom");
|
||||
IndexMetaData indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test");
|
||||
assertNotEquals(indexMetaData.getSettings().get("index.dummy"), "invalid dynamic value");
|
||||
|
@ -103,12 +104,13 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
public void testResetDefault() {
|
||||
createIndex("test");
|
||||
|
||||
client().admin().indices().prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.refresh_interval", -1)
|
||||
.put("index.translog.flush_threshold_size", "1024b")
|
||||
)
|
||||
.execute().actionGet();
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder().put("index.refresh_interval", -1).put("index.translog.flush_threshold_size", "1024b"))
|
||||
.execute()
|
||||
.actionGet();
|
||||
IndexMetaData indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test");
|
||||
assertEquals(indexMetaData.getSettings().get("index.refresh_interval"), "-1");
|
||||
for (IndicesService service : internalCluster().getInstances(IndicesService.class)) {
|
||||
|
@ -118,11 +120,13 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
assertEquals(indexService.getIndexSettings().getFlushThresholdSize().getBytes(), 1024);
|
||||
}
|
||||
}
|
||||
client().admin().indices().prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.putNull("index.refresh_interval")
|
||||
)
|
||||
.execute().actionGet();
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder().putNull("index.refresh_interval"))
|
||||
.execute()
|
||||
.actionGet();
|
||||
indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test");
|
||||
assertNull(indexMetaData.getSettings().get("index.refresh_interval"));
|
||||
for (IndicesService service : internalCluster().getInstances(IndicesService.class)) {
|
||||
|
@ -136,12 +140,15 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
public void testOpenCloseUpdateSettings() throws Exception {
|
||||
createIndex("test");
|
||||
try {
|
||||
client().admin().indices().prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.refresh_interval", -1) // this one can change
|
||||
.put("index.fielddata.cache", "none") // this one can't
|
||||
)
|
||||
.execute().actionGet();
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.refresh_interval", -1) // this one can change
|
||||
.put("index.fielddata.cache", "none")) // this one can't
|
||||
.execute()
|
||||
.actionGet();
|
||||
fail();
|
||||
} catch (IllegalArgumentException e) {
|
||||
// all is well
|
||||
|
@ -156,11 +163,13 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
assertThat(getSettingsResponse.getSetting("test", "index.refresh_interval"), nullValue());
|
||||
assertThat(getSettingsResponse.getSetting("test", "index.fielddata.cache"), nullValue());
|
||||
|
||||
client().admin().indices().prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.refresh_interval", -1) // this one can change
|
||||
)
|
||||
.execute().actionGet();
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder().put("index.refresh_interval", -1)) // this one can change
|
||||
.execute()
|
||||
.actionGet();
|
||||
|
||||
indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test");
|
||||
assertThat(indexMetaData.getSettings().get("index.refresh_interval"), equalTo("-1"));
|
||||
|
@ -171,29 +180,43 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
// now close the index, change the non dynamic setting, and see that it applies
|
||||
|
||||
// Wait for the index to turn green before attempting to close it
|
||||
ClusterHealthResponse health = client().admin().cluster().prepareHealth().setTimeout("30s").setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
|
||||
ClusterHealthResponse health =
|
||||
client()
|
||||
.admin()
|
||||
.cluster()
|
||||
.prepareHealth()
|
||||
.setTimeout("30s")
|
||||
.setWaitForEvents(Priority.LANGUID)
|
||||
.setWaitForGreenStatus()
|
||||
.execute()
|
||||
.actionGet();
|
||||
assertThat(health.isTimedOut(), equalTo(false));
|
||||
|
||||
client().admin().indices().prepareClose("test").execute().actionGet();
|
||||
|
||||
try {
|
||||
client().admin().indices().prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
|
||||
)
|
||||
.execute().actionGet();
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1))
|
||||
.execute()
|
||||
.actionGet();
|
||||
fail("can't change number of replicas on a closed index");
|
||||
} catch (IllegalArgumentException ex) {
|
||||
assertTrue(ex.getMessage(), ex.getMessage().startsWith("Can't update [index.number_of_replicas] on closed indices [[test/"));
|
||||
assertTrue(ex.getMessage(), ex.getMessage().endsWith("]] - can leave index in an unopenable state"));
|
||||
// expected
|
||||
}
|
||||
client().admin().indices().prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.refresh_interval", "1s") // this one can change
|
||||
.put("index.fielddata.cache", "none") // this one can't
|
||||
)
|
||||
.execute().actionGet();
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.refresh_interval", "1s") // this one can change
|
||||
.put("index.fielddata.cache", "none")) // this one can't
|
||||
.execute()
|
||||
.actionGet();
|
||||
|
||||
indexMetaData = client().admin().cluster().prepareState().execute().actionGet().getState().metaData().index("test");
|
||||
assertThat(indexMetaData.getSettings().get("index.refresh_interval"), equalTo("1s"));
|
||||
|
@ -209,15 +232,14 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
createIndex("test");
|
||||
client().prepareIndex("test", "type", "1").setSource("f", 1).get(); // set version to 1
|
||||
client().prepareDelete("test", "type", "1").get(); // sets version to 2
|
||||
client().prepareIndex("test", "type", "1").setSource("f", 2).setVersion(2).get(); // delete is still in cache this should work & set version to 3
|
||||
client().admin().indices().prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put("index.gc_deletes", 0)
|
||||
).get();
|
||||
// delete is still in cache this should work & set version to 3
|
||||
client().prepareIndex("test", "type", "1").setSource("f", 2).setVersion(2).get();
|
||||
client().admin().indices().prepareUpdateSettings("test").setSettings(Settings.builder().put("index.gc_deletes", 0)).get();
|
||||
|
||||
client().prepareDelete("test", "type", "1").get(); // sets version to 4
|
||||
Thread.sleep(300); // wait for cache time to change TODO: this needs to be solved better. To be discussed.
|
||||
assertThrows(client().prepareIndex("test", "type", "1").setSource("f", 3).setVersion(4), VersionConflictEngineException.class); // delete is should not be in cache
|
||||
// delete is should not be in cache
|
||||
assertThrows(client().prepareIndex("test", "type", "1").setSource("f", 3).setVersion(4), VersionConflictEngineException.class);
|
||||
|
||||
}
|
||||
|
||||
|
@ -263,9 +285,10 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "merge")
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING.getKey(), "1mb"))
|
||||
.setSettings(
|
||||
Settings.builder()
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "merge")
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_MAX_BYTES_PER_SEC_SETTING.getKey(), "1mb"))
|
||||
.get();
|
||||
|
||||
// Make sure setting says it is in fact changed:
|
||||
|
@ -303,8 +326,7 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "none"))
|
||||
.setSettings(Settings.builder().put(IndexStore.INDEX_STORE_THROTTLE_TYPE_SETTING.getKey(), "none"))
|
||||
.get();
|
||||
|
||||
// Optimize does a waitForMerges, which we must do to make sure all in-flight (throttled) merges finish:
|
||||
|
@ -364,25 +386,31 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
@Override
|
||||
public void append(LogEvent event) {
|
||||
String message = event.getMessage().getFormattedMessage();
|
||||
if (event.getLevel() == Level.TRACE &&
|
||||
event.getLoggerName().endsWith("lucene.iw")) {
|
||||
if (event.getLevel() == Level.TRACE && event.getLoggerName().endsWith("lucene.iw")) {
|
||||
}
|
||||
if (event.getLevel() == Level.INFO && message.contains("updating [index.merge.scheduler.max_thread_count] from [10000] to [1]")) {
|
||||
if (event.getLevel() == Level.INFO
|
||||
&& message.contains("updating [index.merge.scheduler.max_thread_count] from [10000] to [1]")) {
|
||||
sawUpdateMaxThreadCount = true;
|
||||
}
|
||||
if (event.getLevel() == Level.INFO && message.contains("updating [index.merge.scheduler.auto_throttle] from [true] to [false]")) {
|
||||
if (event.getLevel() == Level.INFO
|
||||
&& message.contains("updating [index.merge.scheduler.auto_throttle] from [true] to [false]")) {
|
||||
sawUpdateAutoThrottle = true;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean ignoreExceptions() {
|
||||
return false;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public void testUpdateAutoThrottleSettings() throws IllegalAccessException {
|
||||
MockAppender mockAppender = new MockAppender("testUpdateAutoThrottleSettings");
|
||||
mockAppender.start();
|
||||
Logger rootLogger = LogManager.getRootLogger();
|
||||
Level savedLevel = rootLogger.getLevel();
|
||||
Loggers.addAppender(rootLogger, mockAppender);
|
||||
Level savedLevel = rootLogger.getLevel();
|
||||
Loggers.setLevel(rootLogger, Level.TRACE);
|
||||
|
||||
try {
|
||||
|
@ -403,8 +431,7 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), "false"))
|
||||
.setSettings(Settings.builder().put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), "false"))
|
||||
.get();
|
||||
|
||||
// Make sure we log the change:
|
||||
|
@ -414,9 +441,9 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
GetSettingsResponse getSettingsResponse = client().admin().indices().prepareGetSettings("test").get();
|
||||
assertThat(getSettingsResponse.getSetting("test", MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey()), equalTo("false"));
|
||||
} finally {
|
||||
Loggers.setLevel(rootLogger, savedLevel);
|
||||
Loggers.removeAppender(rootLogger, mockAppender);
|
||||
mockAppender.stop();
|
||||
Loggers.setLevel(rootLogger, savedLevel);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -445,22 +472,24 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
));
|
||||
|
||||
{
|
||||
UpdateSettingsRequestBuilder updateBuilder = client().admin().indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1000")
|
||||
);
|
||||
UpdateSettingsRequestBuilder updateBuilder =
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder().put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1000"));
|
||||
exc = expectThrows(IllegalArgumentException.class,
|
||||
() -> updateBuilder.get());
|
||||
assertThat(exc.getMessage(), equalTo("maxThreadCount (= 1000) should be <= maxMergeCount (= 100)"));
|
||||
}
|
||||
|
||||
{
|
||||
UpdateSettingsRequestBuilder updateBuilder = client().admin().indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "10")
|
||||
);
|
||||
UpdateSettingsRequestBuilder updateBuilder =
|
||||
client()
|
||||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder().put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "10"));
|
||||
exc = expectThrows(IllegalArgumentException.class,
|
||||
() -> updateBuilder.get());
|
||||
assertThat(exc.getMessage(), equalTo("maxThreadCount (= 100) should be <= maxMergeCount (= 10)"));
|
||||
|
@ -494,9 +523,7 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
.admin()
|
||||
.indices()
|
||||
.prepareUpdateSettings("test")
|
||||
.setSettings(Settings.builder()
|
||||
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1")
|
||||
)
|
||||
.setSettings(Settings.builder().put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1"))
|
||||
.get();
|
||||
|
||||
// Make sure we log the change:
|
||||
|
@ -507,9 +534,9 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
assertThat(getSettingsResponse.getSetting("test", MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey()), equalTo("1"));
|
||||
|
||||
} finally {
|
||||
Loggers.setLevel(rootLogger, savedLevel);
|
||||
Loggers.removeAppender(rootLogger, mockAppender);
|
||||
mockAppender.stop();
|
||||
Loggers.setLevel(rootLogger, savedLevel);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -538,4 +565,5 @@ public class UpdateSettingsIT extends ESIntegTestCase {
|
|||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -63,37 +63,11 @@ public class IndicesStoreTests extends ESTestCase {
|
|||
NOT_STARTED_STATES = set.toArray(new ShardRoutingState[set.size()]);
|
||||
}
|
||||
|
||||
private static ThreadPool threadPool;
|
||||
|
||||
private IndicesStore indicesStore;
|
||||
private DiscoveryNode localNode;
|
||||
|
||||
private ClusterService clusterService;
|
||||
|
||||
@BeforeClass
|
||||
public static void beforeClass() {
|
||||
threadPool = new TestThreadPool("ShardReplicationTests");
|
||||
}
|
||||
|
||||
@AfterClass
|
||||
public static void afterClass() {
|
||||
ThreadPool.terminate(threadPool, 30, TimeUnit.SECONDS);
|
||||
threadPool = null;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() {
|
||||
public void createLocalNode() {
|
||||
localNode = new DiscoveryNode("abc", buildNewFakeTransportAddress(), emptyMap(), emptySet(), Version.CURRENT);
|
||||
clusterService = createClusterService(threadPool);
|
||||
TransportService transportService = new TransportService(clusterService.getSettings(), null, null,
|
||||
TransportService.NOOP_TRANSPORT_INTERCEPTOR, null);
|
||||
indicesStore = new IndicesStore(Settings.EMPTY, null, clusterService, transportService, null);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() throws Exception {
|
||||
super.tearDown();
|
||||
clusterService.close();
|
||||
}
|
||||
|
||||
public void testShardCanBeDeletedNoShardRouting() throws Exception {
|
||||
|
@ -104,7 +78,7 @@ public class IndicesStoreTests extends ESTestCase {
|
|||
clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(numShards).numberOfReplicas(numReplicas)));
|
||||
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", "_na_", 1));
|
||||
|
||||
assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
|
||||
assertFalse(IndicesStore.shardCanBeDeleted(localNode.getId(), routingTable.build()));
|
||||
}
|
||||
|
||||
public void testShardCanBeDeletedNoShardStarted() throws Exception {
|
||||
|
@ -128,10 +102,11 @@ public class IndicesStoreTests extends ESTestCase {
|
|||
if (state == ShardRoutingState.UNASSIGNED) {
|
||||
unassignedInfo = new UnassignedInfo(UnassignedInfo.Reason.INDEX_CREATED, null);
|
||||
}
|
||||
routingTable.addShard(TestShardRouting.newShardRouting("test", i, "xyz", null, j == 0, state, unassignedInfo));
|
||||
routingTable.addShard(TestShardRouting.newShardRouting("test", i, randomBoolean() ? localNode.getId() : randomAsciiOfLength(10), null, j == 0, state, unassignedInfo));
|
||||
}
|
||||
}
|
||||
assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
|
||||
|
||||
assertFalse(IndicesStore.shardCanBeDeleted(localNode.getId(), routingTable.build()));
|
||||
}
|
||||
|
||||
public void testShardCanBeDeletedShardExistsLocally() throws Exception {
|
||||
|
@ -154,27 +129,7 @@ public class IndicesStoreTests extends ESTestCase {
|
|||
}
|
||||
|
||||
// Shard exists locally, can't delete shard
|
||||
assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
|
||||
}
|
||||
|
||||
public void testShardCanBeDeletedNodeNotInList() throws Exception {
|
||||
int numShards = randomIntBetween(1, 7);
|
||||
int numReplicas = randomInt(2);
|
||||
|
||||
ClusterState.Builder clusterState = ClusterState.builder(new ClusterName("test"));
|
||||
clusterState.metaData(MetaData.builder().put(IndexMetaData.builder("test").settings(settings(Version.CURRENT)).numberOfShards(numShards).numberOfReplicas(numReplicas)));
|
||||
clusterState.nodes(DiscoveryNodes.builder().localNodeId(localNode.getId()).add(localNode));
|
||||
IndexShardRoutingTable.Builder routingTable = new IndexShardRoutingTable.Builder(new ShardId("test", "_na_", 1));
|
||||
for (int i = 0; i < numShards; i++) {
|
||||
String relocatingNodeId = randomBoolean() ? null : "def";
|
||||
routingTable.addShard(TestShardRouting.newShardRouting("test", i, "xyz", relocatingNodeId, true, ShardRoutingState.STARTED));
|
||||
for (int j = 0; j < numReplicas; j++) {
|
||||
routingTable.addShard(TestShardRouting.newShardRouting("test", i, "xyz", relocatingNodeId, false, ShardRoutingState.STARTED));
|
||||
}
|
||||
}
|
||||
|
||||
// null node -> false
|
||||
assertFalse(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
|
||||
assertFalse(IndicesStore.shardCanBeDeleted(localNode.getId(), routingTable.build()));
|
||||
}
|
||||
|
||||
public void testShardCanBeDeletedNodeVersion() throws Exception {
|
||||
|
@ -196,7 +151,7 @@ public class IndicesStoreTests extends ESTestCase {
|
|||
}
|
||||
|
||||
// shard exist on other node (abc)
|
||||
assertTrue(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
|
||||
assertTrue(IndicesStore.shardCanBeDeleted(localNode.getId(), routingTable.build()));
|
||||
}
|
||||
|
||||
public void testShardCanBeDeletedRelocatingNode() throws Exception {
|
||||
|
@ -221,6 +176,6 @@ public class IndicesStoreTests extends ESTestCase {
|
|||
}
|
||||
|
||||
// shard exist on other node (abc and def)
|
||||
assertTrue(indicesStore.shardCanBeDeleted(clusterState.build(), routingTable.build()));
|
||||
assertTrue(IndicesStore.shardCanBeDeleted(localNode.getId(), routingTable.build()));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -176,7 +176,6 @@ public class SearchCancellationIT extends ESIntegTestCase {
|
|||
ensureSearchWasCancelled(searchResponse);
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/21126")
|
||||
public void testCancellationOfScrollSearches() throws Exception {
|
||||
|
||||
List<ScriptedBlockPlugin> plugins = initBlockFactory();
|
||||
|
@ -198,7 +197,6 @@ public class SearchCancellationIT extends ESIntegTestCase {
|
|||
}
|
||||
|
||||
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/21126")
|
||||
public void testCancellationOfScrollSearchesOnFollowupRequests() throws Exception {
|
||||
|
||||
List<ScriptedBlockPlugin> plugins = initBlockFactory();
|
||||
|
|
|
@ -1904,7 +1904,7 @@ public class SearchQueryIT extends ESIntegTestCase {
|
|||
assertHitCount(client().prepareSearch("test").setSize(0).setQuery(rangeQuery("field").lte(-999999999999L)).get(), 3);
|
||||
}
|
||||
|
||||
@AwaitsFix(bugUrl = "NOCOMMIT")
|
||||
@AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/21501")
|
||||
public void testRangeQueryWithTimeZone() throws Exception {
|
||||
assertAcked(prepareCreate("test")
|
||||
.addMapping("type1", "date", "type=date", "num", "type=integer"));
|
||||
|
|
|
@ -1499,8 +1499,8 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
.put("location", repositoryLocation)
|
||||
.put("compress", randomBoolean())
|
||||
.put("chunk_size", randomIntBetween(1000, 10000), ByteSizeUnit.BYTES)
|
||||
.put("max_restore_bytes_per_sec", throttleRestore ? "0.5k" : "0")
|
||||
.put("max_snapshot_bytes_per_sec", throttleSnapshot ? "0.5k" : "0")));
|
||||
.put("max_restore_bytes_per_sec", throttleRestore ? "10k" : "0")
|
||||
.put("max_snapshot_bytes_per_sec", throttleSnapshot ? "10k" : "0")));
|
||||
|
||||
createIndex("test-idx");
|
||||
ensureGreen();
|
||||
|
@ -2675,23 +2675,31 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
public void testSnapshotSucceedsAfterSnapshotFailure() throws Exception {
|
||||
logger.info("--> creating repository");
|
||||
final Path repoPath = randomRepoPath();
|
||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo").setType("mock").setVerify(false).setSettings(
|
||||
Settings.builder().put("location", repoPath).put("random_control_io_exception_rate", randomIntBetween(5, 20) / 100f)));
|
||||
final Client client = client();
|
||||
assertAcked(client.admin().cluster().preparePutRepository("test-repo").setType("mock").setVerify(false).setSettings(
|
||||
Settings.builder()
|
||||
.put("location", repoPath)
|
||||
.put("random_control_io_exception_rate", randomIntBetween(5, 20) / 100f)
|
||||
.put("random", randomAsciiOfLength(10))));
|
||||
|
||||
logger.info("--> indexing some data");
|
||||
createIndex("test-idx");
|
||||
assertAcked(prepareCreate("test-idx").setSettings(
|
||||
// the less the number of shards, the less control files we have, so we are giving a higher probability of
|
||||
// triggering an IOException toward the end when writing the pending-index-* files, which are the files
|
||||
// that caused problems with writing subsequent snapshots if they happened to be lingering in the repository
|
||||
Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 1).put(SETTING_NUMBER_OF_REPLICAS, 0)));
|
||||
ensureGreen();
|
||||
final int numDocs = randomIntBetween(1, 5);
|
||||
for (int i = 0; i < numDocs; i++) {
|
||||
index("test-idx", "doc", Integer.toString(i), "foo", "bar" + i);
|
||||
}
|
||||
refresh();
|
||||
assertThat(client().prepareSearch("test-idx").setSize(0).get().getHits().totalHits(), equalTo((long) numDocs));
|
||||
assertThat(client.prepareSearch("test-idx").setSize(0).get().getHits().totalHits(), equalTo((long) numDocs));
|
||||
|
||||
logger.info("--> snapshot with potential I/O failures");
|
||||
try {
|
||||
CreateSnapshotResponse createSnapshotResponse =
|
||||
client().admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
|
||||
client.admin().cluster().prepareCreateSnapshot("test-repo", "test-snap")
|
||||
.setWaitForCompletion(true)
|
||||
.setIndices("test-idx")
|
||||
.get();
|
||||
|
@ -2702,21 +2710,21 @@ public class SharedClusterSnapshotRestoreIT extends AbstractSnapshotIntegTestCas
|
|||
assertThat(shardFailure.reason(), containsString("Random IOException"));
|
||||
}
|
||||
}
|
||||
} catch (Exception ex) {
|
||||
} catch (SnapshotCreationException | RepositoryException ex) {
|
||||
// sometimes, the snapshot will fail with a top level I/O exception
|
||||
assertThat(ExceptionsHelper.stackTrace(ex), containsString("Random IOException"));
|
||||
}
|
||||
|
||||
logger.info("--> snapshot with no I/O failures");
|
||||
assertAcked(client().admin().cluster().preparePutRepository("test-repo-2").setType("mock").setVerify(false).setSettings(
|
||||
assertAcked(client.admin().cluster().preparePutRepository("test-repo-2").setType("mock").setSettings(
|
||||
Settings.builder().put("location", repoPath)));
|
||||
CreateSnapshotResponse createSnapshotResponse =
|
||||
client().admin().cluster().prepareCreateSnapshot("test-repo-2", "test-snap-2")
|
||||
client.admin().cluster().prepareCreateSnapshot("test-repo-2", "test-snap-2")
|
||||
.setWaitForCompletion(true)
|
||||
.setIndices("test-idx")
|
||||
.get();
|
||||
assertEquals(0, createSnapshotResponse.getSnapshotInfo().failedShards());
|
||||
GetSnapshotsResponse getSnapshotsResponse = client().admin().cluster().prepareGetSnapshots("test-repo-2")
|
||||
GetSnapshotsResponse getSnapshotsResponse = client.admin().cluster().prepareGetSnapshots("test-repo-2")
|
||||
.addSnapshots("test-snap-2").get();
|
||||
assertEquals(SnapshotState.SUCCESS, getSnapshotsResponse.getSnapshots().get(0).state());
|
||||
}
|
||||
|
|
|
@ -33,6 +33,7 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
||||
import com.carrotsearch.randomizedtesting.RandomizedContext;
|
||||
import org.apache.lucene.index.CorruptIndexException;
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.cluster.metadata.MetaData;
|
||||
|
@ -321,20 +322,28 @@ public class MockRepository extends FsRepository {
|
|||
|
||||
@Override
|
||||
public void move(String sourceBlob, String targetBlob) throws IOException {
|
||||
// simulate a non-atomic move, since many blob container implementations
|
||||
// will not have an atomic move, and we should be able to handle that
|
||||
maybeIOExceptionOrBlock(targetBlob);
|
||||
super.writeBlob(targetBlob, super.readBlob(sourceBlob), 0L);
|
||||
super.deleteBlob(sourceBlob);
|
||||
if (RandomizedContext.current().getRandom().nextBoolean()) {
|
||||
// simulate a non-atomic move, since many blob container implementations
|
||||
// will not have an atomic move, and we should be able to handle that
|
||||
maybeIOExceptionOrBlock(targetBlob);
|
||||
super.writeBlob(targetBlob, super.readBlob(sourceBlob), 0L);
|
||||
super.deleteBlob(sourceBlob);
|
||||
} else {
|
||||
// atomic move since this inherits from FsBlobContainer which provides atomic moves
|
||||
maybeIOExceptionOrBlock(targetBlob);
|
||||
super.move(sourceBlob, targetBlob);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeBlob(String blobName, InputStream inputStream, long blobSize) throws IOException {
|
||||
maybeIOExceptionOrBlock(blobName);
|
||||
super.writeBlob(blobName, inputStream, blobSize);
|
||||
// for network based repositories, the blob may have been written but we may still
|
||||
// get an error with the client connection, so an IOException here simulates this
|
||||
maybeIOExceptionOrBlock(blobName);
|
||||
if (RandomizedContext.current().getRandom().nextBoolean()) {
|
||||
// for network based repositories, the blob may have been written but we may still
|
||||
// get an error with the client connection, so an IOException here simulates this
|
||||
maybeIOExceptionOrBlock(blobName);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -51,6 +51,11 @@ case "$1" in
|
|||
;;
|
||||
esac
|
||||
|
||||
# to pick up /usr/lib/sysctl.d/elasticsearch.conf
|
||||
if command -v /usr/lib/systemd/systemd-sysctl > /dev/null; then
|
||||
/usr/lib/systemd/systemd-sysctl
|
||||
fi
|
||||
|
||||
if [ "x$IS_UPGRADE" != "xtrue" ]; then
|
||||
if command -v systemctl >/dev/null; then
|
||||
echo "### NOT starting on installation, please execute the following statements to configure elasticsearch service to start automatically using systemd"
|
||||
|
|
|
@ -35,6 +35,70 @@ For example, you can define the latest version in your `pom.xml` file:
|
|||
</dependency>
|
||||
--------------------------------------------------
|
||||
|
||||
=== Log4j 2 Logger
|
||||
|
||||
You need to also include Log4j 2 dependencies:
|
||||
|
||||
["source","xml",subs="attributes"]
|
||||
--------------------------------------------------
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-api</artifactId>
|
||||
<version>2.7</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-core</artifactId>
|
||||
<version>2.7</version>
|
||||
</dependency>
|
||||
--------------------------------------------------
|
||||
|
||||
And also provide a Log4j 2 configuration file in your classpath.
|
||||
For example, you can add in your `src/main/resources` project dir a `log4j2.properties` file like:
|
||||
|
||||
|
||||
["source","properties",subs="attributes"]
|
||||
--------------------------------------------------
|
||||
appender.console.type = Console
|
||||
appender.console.name = console
|
||||
appender.console.layout.type = PatternLayout
|
||||
appender.console.layout.pattern = [%d{ISO8601}][%-5p][%-25c] %marker%m%n
|
||||
|
||||
rootLogger.level = info
|
||||
rootLogger.appenderRef.console.ref = console
|
||||
--------------------------------------------------
|
||||
|
||||
=== Using another Logger
|
||||
|
||||
If you want to use another logger than Log4j 2, you can use http://www.slf4j.org/[SLF4J] bridge to do that:
|
||||
|
||||
["source","xml",subs="attributes"]
|
||||
--------------------------------------------------
|
||||
<dependency>
|
||||
<groupId>org.apache.logging.log4j</groupId>
|
||||
<artifactId>log4j-to-slf4j</artifactId>
|
||||
<version>2.7</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-api</artifactId>
|
||||
<version>1.7.21</version>
|
||||
</dependency>
|
||||
--------------------------------------------------
|
||||
|
||||
http://www.slf4j.org/manual.html[This page] lists implementations you can use. Pick your favorite logger
|
||||
and add it as a dependency. As an example, we will use the `slf4j-simple` logger:
|
||||
|
||||
["source","xml",subs="attributes"]
|
||||
--------------------------------------------------
|
||||
<dependency>
|
||||
<groupId>org.slf4j</groupId>
|
||||
<artifactId>slf4j-simple</artifactId>
|
||||
<version>1.7.21</version>
|
||||
</dependency>
|
||||
--------------------------------------------------
|
||||
|
||||
|
||||
== Dealing with JAR dependency conflicts
|
||||
|
||||
If you want to use Elasticsearch in your Java application, you may have to deal with version conflicts with third party
|
||||
|
|
|
@ -31,6 +31,7 @@ way to reindex old indices is to use the `reindex` API.
|
|||
* <<breaking_60_cluster_changes>>
|
||||
* <<breaking_60_settings_changes>>
|
||||
* <<breaking_60_plugins_changes>>
|
||||
* <<breaking_60_indices_changes>>
|
||||
|
||||
include::migrate_6_0/cat.asciidoc[]
|
||||
|
||||
|
@ -45,3 +46,5 @@ include::migrate_6_0/cluster.asciidoc[]
|
|||
include::migrate_6_0/settings.asciidoc[]
|
||||
|
||||
include::migrate_6_0/plugins.asciidoc[]
|
||||
|
||||
include::migrate_6_0/indices.asciidoc[]
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
[[breaking_60_indices_changes]]
|
||||
=== Templates changes
|
||||
|
||||
==== `template` is now `index_patterns`
|
||||
|
||||
Previously templates expressed the indices that they should match using a glob
|
||||
style pattern in the `template` field. They should now use the `index_patterns`
|
||||
field instead. As the name implies you can define multiple glob style patterns
|
||||
in an array but for convenience defining a single pattern as a bare string is
|
||||
also supported. So both of these examples are valid:
|
||||
|
||||
[source,js]
|
||||
--------------------------------------------------
|
||||
PUT _template/template_1
|
||||
{
|
||||
"index_patterns": ["te*", "bar*"],
|
||||
"settings": {
|
||||
"number_of_shards": 1
|
||||
}
|
||||
}
|
||||
PUT _template/template_2
|
||||
{
|
||||
"index_patterns": "te*",
|
||||
"settings": {
|
||||
"number_of_shards": 1
|
||||
}
|
||||
}
|
||||
--------------------------------------------------
|
||||
// CONSOLE
|
|
@ -123,6 +123,7 @@ verify_package_installation() {
|
|||
assert_file "/usr/lib/systemd/system/elasticsearch.service" f root root 644
|
||||
assert_file "/usr/lib/tmpfiles.d/elasticsearch.conf" f root root 644
|
||||
assert_file "/usr/lib/sysctl.d/elasticsearch.conf" f root root 644
|
||||
[[ $(sysctl vm.max_map_count) =~ "vm.max_map_count = 262144" ]]
|
||||
fi
|
||||
|
||||
if is_sysvinit; then
|
||||
|
|
Loading…
Reference in New Issue