mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-03-25 01:19:02 +00:00
Add index block api (#58716)
Adds an API for putting an index block in place, which also ensures for write blocks that, once successfully returning to the user, all shards of the index are properly accounting for the block, for example that all in-flight writes to an index have been completed after adding the write block. This API allows coordinating more complex workflows, where it is crucial that an index is no longer receiving writes after the API completes, useful for example when marking an index as read-only during an upgrade in order to reindex its documents.
This commit is contained in:
parent
def5550df3
commit
b885cbff1a
@ -802,7 +802,8 @@ public class RestHighLevelClientTests extends ESTestCase {
|
||||
"render_search_template",
|
||||
"scripts_painless_execute",
|
||||
"indices.simulate_template",
|
||||
"indices.resolve_index"
|
||||
"indices.resolve_index",
|
||||
"indices.add_block"
|
||||
};
|
||||
//These API are not required for high-level client feature completeness
|
||||
String[] notRequiredApi = new String[] {
|
||||
|
@ -173,35 +173,6 @@ specific index module:
|
||||
for the <<analysis-shingle-tokenfilter,`shingle` token filter>>. Defaults to
|
||||
`3`.
|
||||
|
||||
`index.blocks.read_only`::
|
||||
|
||||
Set to `true` to make the index and index metadata read only, `false` to
|
||||
allow writes and metadata changes.
|
||||
|
||||
`index.blocks.read_only_allow_delete`::
|
||||
|
||||
Similar to `index.blocks.read_only`, but also allows deleting the index to
|
||||
make more resources available. The <<shard-allocation-awareness,disk-based shard
|
||||
allocator>> may add and remove this block automatically.
|
||||
|
||||
Deleting documents from an index to release resources - rather than deleting the index itself - can increase the index size over time. When `index.blocks.read_only_allow_delete` is set to `true`, deleting documents is not permitted. However, deleting the index itself releases the read-only index block and makes resources available almost immediately.
|
||||
|
||||
IMPORTANT: {es} adds and removes the read-only index block automatically when the disk utilization falls below the high watermark, controlled by <<cluster-routing-flood_stage,cluster.routing.allocation.disk.watermark.flood_stage>>.
|
||||
|
||||
`index.blocks.read`::
|
||||
|
||||
Set to `true` to disable read operations against the index.
|
||||
|
||||
`index.blocks.write`::
|
||||
|
||||
Set to `true` to disable data write operations against the index. Unlike `read_only`,
|
||||
this setting does not affect metadata. For instance, you can close an index with a `write`
|
||||
block, but not an index with a `read_only` block.
|
||||
|
||||
`index.blocks.metadata`::
|
||||
|
||||
Set to `true` to disable index metadata reads and writes.
|
||||
|
||||
`index.max_refresh_listeners`::
|
||||
|
||||
Maximum number of refresh listeners available on each shard of the index.
|
||||
@ -321,6 +292,8 @@ include::index-modules/analysis.asciidoc[]
|
||||
|
||||
include::index-modules/allocation.asciidoc[]
|
||||
|
||||
include::index-modules/blocks.asciidoc[]
|
||||
|
||||
include::index-modules/mapper.asciidoc[]
|
||||
|
||||
include::index-modules/merge.asciidoc[]
|
||||
|
146
docs/reference/index-modules/blocks.asciidoc
Normal file
146
docs/reference/index-modules/blocks.asciidoc
Normal file
@ -0,0 +1,146 @@
|
||||
[[index-modules-blocks]]
|
||||
== Index blocks
|
||||
|
||||
Index blocks limit the kind of operations that are available on a certain
|
||||
index. The blocks come in different flavours, allowing to block write,
|
||||
read, or metadata operations. The blocks can be set / removed using dynamic
|
||||
index settings, or can be added using a dedicated API, which also ensures
|
||||
for write blocks that, once successfully returning to the user, all shards
|
||||
of the index are properly accounting for the block, for example that all
|
||||
in-flight writes to an index have been completed after adding the write
|
||||
block.
|
||||
|
||||
[discrete]
|
||||
[[index-block-settings]]
|
||||
=== Index block settings
|
||||
|
||||
The following _dynamic_ index settings determine the blocks present on an
|
||||
index:
|
||||
|
||||
`index.blocks.read_only`::
|
||||
|
||||
Set to `true` to make the index and index metadata read only, `false` to
|
||||
allow writes and metadata changes.
|
||||
|
||||
`index.blocks.read_only_allow_delete`::
|
||||
|
||||
Similar to `index.blocks.read_only`, but also allows deleting the index to
|
||||
make more resources available. The <<shard-allocation-awareness,disk-based shard
|
||||
allocator>> may add and remove this block automatically.
|
||||
|
||||
Deleting documents from an index to release resources - rather than deleting the index itself - can increase the index size over time. When `index.blocks.read_only_allow_delete` is set to `true`, deleting documents is not permitted. However, deleting the index itself releases the read-only index block and makes resources available almost immediately.
|
||||
|
||||
IMPORTANT: {es} adds and removes the read-only index block automatically when the disk utilization falls below the high watermark, controlled by <<cluster-routing-flood_stage,cluster.routing.allocation.disk.watermark.flood_stage>>.
|
||||
|
||||
`index.blocks.read`::
|
||||
|
||||
Set to `true` to disable read operations against the index.
|
||||
|
||||
`index.blocks.write`::
|
||||
|
||||
Set to `true` to disable data write operations against the index. Unlike `read_only`,
|
||||
this setting does not affect metadata. For instance, you can close an index with a `write`
|
||||
block, but not an index with a `read_only` block.
|
||||
|
||||
`index.blocks.metadata`::
|
||||
|
||||
Set to `true` to disable index metadata reads and writes.
|
||||
|
||||
[discrete]
|
||||
[[add-index-block]]
|
||||
=== Add index block API
|
||||
|
||||
Adds an index block to an index.
|
||||
|
||||
[source,console]
|
||||
--------------------------------------------------
|
||||
PUT /twitter/_block/write
|
||||
--------------------------------------------------
|
||||
// TEST[setup:twitter]
|
||||
|
||||
|
||||
[discrete]
|
||||
[[add-index-block-api-request]]
|
||||
==== {api-request-title}
|
||||
|
||||
`PUT /<index>/_block/<block>`
|
||||
|
||||
|
||||
[discrete]
|
||||
[role="child_attributes"]
|
||||
[[add-index-block-api-path-params]]
|
||||
==== {api-path-parms-title}
|
||||
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=index]
|
||||
+
|
||||
To add blocks to all indices, use `_all` or `*`. To disallow the adding
|
||||
of blocks to indices with `_all` or wildcard expressions,
|
||||
change the `action.destructive_requires_name` cluster setting to `true`.
|
||||
You can update this setting in the `elasticsearch.yml` file
|
||||
or using the <<cluster-update-settings,cluster update settings>> API.
|
||||
`<block>`::
|
||||
(Required, string)
|
||||
Block type to add to the index.
|
||||
+
|
||||
.Valid values for `<block>`
|
||||
[%collapsible%open]
|
||||
====
|
||||
`metadata`::
|
||||
Disable metadata changes, such as closing the index.
|
||||
|
||||
`read`::
|
||||
Disable read operations.
|
||||
|
||||
`read_only`::
|
||||
Disable write operations and metadata changes.
|
||||
|
||||
`read_only_allow_delete`::
|
||||
Disable write operations and metadata changes.
|
||||
Document deletion is disabled.
|
||||
However, index deletion is still allowed.
|
||||
|
||||
`write`::
|
||||
Disable write operations. However, metadata changes are still allowed.
|
||||
====
|
||||
[discrete]
|
||||
[[add-index-block-api-query-params]]
|
||||
==== {api-query-parms-title}
|
||||
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=allow-no-indices]
|
||||
+
|
||||
Defaults to `true`.
|
||||
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=expand-wildcards]
|
||||
+
|
||||
Defaults to `open`.
|
||||
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=index-ignore-unavailable]
|
||||
|
||||
include::{es-repo-dir}/rest-api/common-parms.asciidoc[tag=timeoutparms]
|
||||
|
||||
[discrete]
|
||||
[[add-index-block-api-example]]
|
||||
==== {api-examples-title}
|
||||
|
||||
The following example shows how to add an index block:
|
||||
|
||||
[source,console]
|
||||
--------------------------------------------------
|
||||
PUT /my_index/_block/write
|
||||
--------------------------------------------------
|
||||
// TEST[s/^/PUT my_index\n/]
|
||||
|
||||
The API returns following response:
|
||||
|
||||
[source,console-result]
|
||||
--------------------------------------------------
|
||||
{
|
||||
"acknowledged" : true,
|
||||
"shards_acknowledged" : true,
|
||||
"indices" : [ {
|
||||
"name" : "my_index",
|
||||
"blocked" : true
|
||||
} ]
|
||||
}
|
||||
--------------------------------------------------
|
||||
|
@ -61,13 +61,20 @@ the current write index, the data stream must first be
|
||||
<<rollover-data-stream-ex,rolled over>> so that a new write index is created
|
||||
and then the previous write index can be closed.
|
||||
|
||||
// end::closed-index[]
|
||||
|
||||
[[open-index-api-wait-for-active-shards]]
|
||||
===== Wait For active shards
|
||||
|
||||
// tag::wait-for-active-shards[]
|
||||
|
||||
Because opening or closing an index allocates its shards, the
|
||||
<<create-index-wait-for-active-shards,`wait_for_active_shards`>> setting on
|
||||
index creation applies to the `_open` and `_close` index actions as well.
|
||||
|
||||
// end::closed-index[]
|
||||
// end::wait-for-active-shards[]
|
||||
|
||||
|
||||
|
||||
|
||||
[[open-index-api-path-params]]
|
||||
|
@ -0,0 +1,59 @@
|
||||
{
|
||||
"indices.add_block":{
|
||||
"documentation":{
|
||||
"url":"https://www.elastic.co/guide/en/elasticsearch/reference/master/indices-blocks.html",
|
||||
"description":"Adds a block to an index."
|
||||
},
|
||||
"stability":"stable",
|
||||
"url":{
|
||||
"paths":[
|
||||
{
|
||||
"path":"/{index}/_block/{block}",
|
||||
"methods":[
|
||||
"PUT"
|
||||
],
|
||||
"parts":{
|
||||
"index":{
|
||||
"type":"list",
|
||||
"description":"A comma separated list of indices to add a block to"
|
||||
},
|
||||
"block":{
|
||||
"type":"string",
|
||||
"description":"The block to add (one of read, write, read_only, metadata, read_only_allow_delete)"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
},
|
||||
"params":{
|
||||
"timeout":{
|
||||
"type":"time",
|
||||
"description":"Explicit operation timeout"
|
||||
},
|
||||
"master_timeout":{
|
||||
"type":"time",
|
||||
"description":"Specify timeout for connection to master"
|
||||
},
|
||||
"ignore_unavailable":{
|
||||
"type":"boolean",
|
||||
"description":"Whether specified concrete indices should be ignored when unavailable (missing or closed)"
|
||||
},
|
||||
"allow_no_indices":{
|
||||
"type":"boolean",
|
||||
"description":"Whether to ignore if a wildcard indices expression resolves into no concrete indices. (This includes `_all` string or when no indices have been specified)"
|
||||
},
|
||||
"expand_wildcards":{
|
||||
"type":"enum",
|
||||
"options":[
|
||||
"open",
|
||||
"closed",
|
||||
"hidden",
|
||||
"none",
|
||||
"all"
|
||||
],
|
||||
"default":"open",
|
||||
"description":"Whether to expand wildcard expression to concrete indices that are open, closed or both."
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,33 @@
|
||||
---
|
||||
"Basic test for index blocks":
|
||||
- skip:
|
||||
version: " - 7.8.99"
|
||||
reason: "index block APIs have only been made available in 7.9.0"
|
||||
- do:
|
||||
indices.create:
|
||||
index: test_index
|
||||
body:
|
||||
settings:
|
||||
number_of_replicas: 0
|
||||
|
||||
- do:
|
||||
indices.add_block:
|
||||
index: test_index
|
||||
block: write
|
||||
- is_true: acknowledged
|
||||
|
||||
- do:
|
||||
catch: /cluster_block_exception/
|
||||
index:
|
||||
index: test_index
|
||||
body: { foo: bar }
|
||||
|
||||
- do:
|
||||
search:
|
||||
index: test_index
|
||||
|
||||
- do:
|
||||
indices.put_settings:
|
||||
index: test_index
|
||||
body:
|
||||
index.blocks.write: false
|
@ -19,19 +19,49 @@
|
||||
|
||||
package org.elasticsearch.blocks;
|
||||
|
||||
import org.elasticsearch.ExceptionsHelper;
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequestBuilder;
|
||||
import org.elasticsearch.action.index.IndexRequestBuilder;
|
||||
import org.elasticsearch.action.index.IndexResponse;
|
||||
import org.elasticsearch.action.support.ActiveShardCount;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock;
|
||||
import org.elasticsearch.cluster.routing.ShardRouting;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.IndexNotFoundException;
|
||||
import org.elasticsearch.test.BackgroundIndexer;
|
||||
import org.elasticsearch.test.ESIntegTestCase;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.function.Consumer;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
import static java.util.Collections.emptySet;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.elasticsearch.action.support.IndicesOptions.lenientExpandOpen;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_BLOCKS_WRITE;
|
||||
import static org.elasticsearch.cluster.metadata.IndexMetadata.SETTING_READ_ONLY;
|
||||
import static org.elasticsearch.search.internal.SearchContext.TRACK_TOTAL_HITS_ACCURATE;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertBlocked;
|
||||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.hasSize;
|
||||
import static org.hamcrest.Matchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
|
||||
@ESIntegTestCase.ClusterScope(scope = ESIntegTestCase.Scope.TEST)
|
||||
@ -83,11 +113,11 @@ public class SimpleBlocksIT extends ESIntegTestCase {
|
||||
canCreateIndex("test1");
|
||||
canIndexDocument("test1");
|
||||
client().admin().indices().prepareUpdateSettings("test1")
|
||||
.setSettings(Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, true))
|
||||
.setSettings(Settings.builder().put(SETTING_BLOCKS_WRITE, true))
|
||||
.execute().actionGet();
|
||||
canNotIndexDocument("test1");
|
||||
client().admin().indices().prepareUpdateSettings("test1")
|
||||
.setSettings(Settings.builder().put(IndexMetadata.SETTING_BLOCKS_WRITE, false))
|
||||
.setSettings(Settings.builder().put(SETTING_BLOCKS_WRITE, false))
|
||||
.execute().actionGet();
|
||||
canIndexDocument("test1");
|
||||
}
|
||||
@ -143,11 +173,320 @@ public class SimpleBlocksIT extends ESIntegTestCase {
|
||||
|
||||
private void setIndexReadOnly(String index, Object value) {
|
||||
HashMap<String, Object> newSettings = new HashMap<>();
|
||||
newSettings.put(IndexMetadata.SETTING_READ_ONLY, value);
|
||||
newSettings.put(SETTING_READ_ONLY, value);
|
||||
|
||||
UpdateSettingsRequestBuilder settingsRequest = client().admin().indices().prepareUpdateSettings(index);
|
||||
settingsRequest.setSettings(newSettings);
|
||||
AcknowledgedResponse settingsResponse = settingsRequest.execute().actionGet();
|
||||
assertThat(settingsResponse, notNullValue());
|
||||
}
|
||||
|
||||
|
||||
public void testAddBlocksWhileExistingBlocks() {
|
||||
createIndex("test");
|
||||
ensureGreen("test");
|
||||
|
||||
for (APIBlock otherBlock : APIBlock.values()) {
|
||||
|
||||
for (APIBlock block : Arrays.asList(APIBlock.READ, APIBlock.WRITE)) {
|
||||
try {
|
||||
enableIndexBlock("test", block.settingName());
|
||||
|
||||
// Adding a block is not blocked
|
||||
AcknowledgedResponse addBlockResponse = client().admin().indices()
|
||||
.prepareAddBlock(otherBlock, "test").get();
|
||||
assertAcked(addBlockResponse);
|
||||
} finally {
|
||||
disableIndexBlock("test", otherBlock.settingName());
|
||||
disableIndexBlock("test", block.settingName());
|
||||
}
|
||||
}
|
||||
|
||||
for (APIBlock block : Arrays.asList(APIBlock.READ_ONLY, APIBlock.METADATA, APIBlock.READ_ONLY_ALLOW_DELETE)) {
|
||||
boolean success = false;
|
||||
try {
|
||||
enableIndexBlock("test", block.settingName());
|
||||
// Adding a block is blocked when there is a metadata block and the new block to be added is not a metadata block
|
||||
if (block.getBlock().contains(ClusterBlockLevel.METADATA_WRITE) &&
|
||||
otherBlock.getBlock().contains(ClusterBlockLevel.METADATA_WRITE) == false) {
|
||||
assertBlocked(client().admin().indices().prepareAddBlock(otherBlock, "test"));
|
||||
} else {
|
||||
assertAcked(client().admin().indices().prepareAddBlock(otherBlock, "test"));
|
||||
success = true;
|
||||
}
|
||||
} finally {
|
||||
if (success) {
|
||||
disableIndexBlock("test", otherBlock.settingName());
|
||||
}
|
||||
disableIndexBlock("test", block.settingName());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public void testAddBlockToMissingIndex() {
|
||||
IndexNotFoundException e = expectThrows(IndexNotFoundException.class, () -> client().admin().indices()
|
||||
.prepareAddBlock(randomFrom(APIBlock.values()),"test").get());
|
||||
assertThat(e.getMessage(), is("no such index [test]"));
|
||||
}
|
||||
|
||||
public void testAddBlockToOneMissingIndex() {
|
||||
createIndex("test1");
|
||||
final IndexNotFoundException e = expectThrows(IndexNotFoundException.class,
|
||||
() -> client().admin().indices().prepareAddBlock(randomFrom(APIBlock.values()),"test1", "test2").get());
|
||||
assertThat(e.getMessage(), is("no such index [test2]"));
|
||||
}
|
||||
|
||||
public void testCloseOneMissingIndexIgnoreMissing() throws Exception {
|
||||
createIndex("test1");
|
||||
final APIBlock block = randomFrom(APIBlock.values());
|
||||
try {
|
||||
assertBusy(() -> assertAcked(client().admin().indices().prepareAddBlock(block, "test1", "test2")
|
||||
.setIndicesOptions(lenientExpandOpen())));
|
||||
assertIndexHasBlock(block, "test1");
|
||||
} finally {
|
||||
disableIndexBlock("test1", block);
|
||||
}
|
||||
}
|
||||
|
||||
public void testAddBlockNoIndex() {
|
||||
final ActionRequestValidationException e = expectThrows(ActionRequestValidationException.class,
|
||||
() -> client().admin().indices().prepareAddBlock(randomFrom(APIBlock.values())).get());
|
||||
assertThat(e.getMessage(), containsString("index is missing"));
|
||||
}
|
||||
|
||||
public void testAddBlockNullIndex() {
|
||||
expectThrows(NullPointerException.class,
|
||||
() -> client().admin().indices().prepareAddBlock(randomFrom(APIBlock.values()), (String[])null));
|
||||
}
|
||||
|
||||
public void testAddIndexBlock() throws Exception {
|
||||
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
createIndex(indexName);
|
||||
|
||||
final int nbDocs = randomIntBetween(0, 50);
|
||||
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs)
|
||||
.mapToObj(i -> client().prepareIndex(indexName, "zzz").setId(String.valueOf(i)).setSource("num", i)).collect(toList()));
|
||||
|
||||
final APIBlock block = randomFrom(APIBlock.values());
|
||||
try {
|
||||
assertAcked(client().admin().indices().prepareAddBlock(block, indexName));
|
||||
assertIndexHasBlock(block, indexName);
|
||||
} finally {
|
||||
disableIndexBlock(indexName, block);
|
||||
}
|
||||
|
||||
client().admin().indices().prepareRefresh(indexName).get();
|
||||
assertHitCount(client().prepareSearch(indexName).setSize(0).get(), nbDocs);
|
||||
}
|
||||
|
||||
public void testSameBlockTwice() throws Exception {
|
||||
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
createIndex(indexName);
|
||||
|
||||
if (randomBoolean()) {
|
||||
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, randomIntBetween(1, 10))
|
||||
.mapToObj(i -> client().prepareIndex(indexName, "zzz").setId(String.valueOf(i)).setSource("num", i)).collect(toList()));
|
||||
}
|
||||
final APIBlock block = randomFrom(APIBlock.values());
|
||||
try {
|
||||
assertAcked(client().admin().indices().prepareAddBlock(block, indexName));
|
||||
assertIndexHasBlock(block, indexName);
|
||||
// Second add block should be acked too, even if it was a METADATA block
|
||||
assertAcked(client().admin().indices().prepareAddBlock(block, indexName));
|
||||
assertIndexHasBlock(block, indexName);
|
||||
} finally {
|
||||
disableIndexBlock(indexName, block);
|
||||
}
|
||||
}
|
||||
|
||||
public void testAddBlockToUnassignedIndex() throws Exception {
|
||||
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
assertAcked(prepareCreate(indexName)
|
||||
.setWaitForActiveShards(ActiveShardCount.NONE)
|
||||
.setSettings(Settings.builder().put("index.routing.allocation.include._name", "nothing").build()));
|
||||
|
||||
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
assertThat(clusterState.metadata().indices().get(indexName).getState(), is(IndexMetadata.State.OPEN));
|
||||
assertThat(clusterState.routingTable().allShards().stream().allMatch(ShardRouting::unassigned), is(true));
|
||||
|
||||
final APIBlock block = randomFrom(APIBlock.values());
|
||||
try {
|
||||
assertAcked(client().admin().indices().prepareAddBlock(block, indexName));
|
||||
assertIndexHasBlock(block, indexName);
|
||||
} finally {
|
||||
disableIndexBlock(indexName, block);
|
||||
}
|
||||
}
|
||||
|
||||
public void testConcurrentAddBlock() throws InterruptedException {
|
||||
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
createIndex(indexName);
|
||||
|
||||
final int nbDocs = randomIntBetween(10, 50);
|
||||
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, nbDocs)
|
||||
.mapToObj(i -> client().prepareIndex(indexName,"zzz").setId(String.valueOf(i)).setSource("num", i)).collect(toList()));
|
||||
ensureYellowAndNoInitializingShards(indexName);
|
||||
|
||||
final CountDownLatch startClosing = new CountDownLatch(1);
|
||||
final Thread[] threads = new Thread[randomIntBetween(2, 5)];
|
||||
|
||||
final APIBlock block = randomFrom(APIBlock.values());
|
||||
|
||||
try {
|
||||
for (int i = 0; i < threads.length; i++) {
|
||||
threads[i] = new Thread(() -> {
|
||||
try {
|
||||
startClosing.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
try {
|
||||
client().admin().indices().prepareAddBlock(block, indexName).get();
|
||||
assertIndexHasBlock(block, indexName);
|
||||
} catch (final ClusterBlockException e) {
|
||||
assertThat(e.blocks(), hasSize(1));
|
||||
assertTrue(e.blocks().stream().allMatch(b -> b.id() == block.getBlock().id()));
|
||||
}
|
||||
});
|
||||
threads[i].start();
|
||||
}
|
||||
|
||||
startClosing.countDown();
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
assertIndexHasBlock(block, indexName);
|
||||
} finally {
|
||||
disableIndexBlock(indexName, block);
|
||||
}
|
||||
}
|
||||
|
||||
public void testAddBlockWhileIndexingDocuments() throws Exception {
|
||||
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
createIndex(indexName);
|
||||
|
||||
final APIBlock block = randomFrom(APIBlock.values());
|
||||
|
||||
int nbDocs = 0;
|
||||
|
||||
try {
|
||||
try (BackgroundIndexer indexer = new BackgroundIndexer(indexName, "_doc", client(), 1000)) {
|
||||
indexer.setFailureAssertion(t -> {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(t);
|
||||
assertThat(cause, instanceOf(ClusterBlockException.class));
|
||||
ClusterBlockException e = (ClusterBlockException) cause;
|
||||
assertThat(e.blocks(), hasSize(1));
|
||||
assertTrue(e.blocks().stream().allMatch(b -> b.id() == block.getBlock().id()));
|
||||
});
|
||||
|
||||
waitForDocs(randomIntBetween(10, 50), indexer);
|
||||
assertAcked(client().admin().indices().prepareAddBlock(block, indexName));
|
||||
indexer.stopAndAwaitStopped();
|
||||
nbDocs += indexer.totalIndexedDocs();
|
||||
}
|
||||
|
||||
assertIndexHasBlock(block, indexName);
|
||||
} finally {
|
||||
disableIndexBlock(indexName, block);
|
||||
}
|
||||
refresh(indexName);
|
||||
assertHitCount(client().prepareSearch(indexName).setSize(0).setTrackTotalHitsUpTo(TRACK_TOTAL_HITS_ACCURATE).get(), nbDocs);
|
||||
}
|
||||
|
||||
public void testAddBlockWhileDeletingIndices() throws Exception {
|
||||
final String[] indices = new String[randomIntBetween(3, 10)];
|
||||
for (int i = 0; i < indices.length; i++) {
|
||||
final String indexName = randomAlphaOfLength(10).toLowerCase(Locale.ROOT);
|
||||
createIndex(indexName);
|
||||
if (randomBoolean()) {
|
||||
indexRandom(randomBoolean(), false, randomBoolean(), IntStream.range(0, 10)
|
||||
.mapToObj(n -> client().prepareIndex(indexName, "zzz").setId(String.valueOf(n))
|
||||
.setSource("num", n)).collect(toList()));
|
||||
}
|
||||
indices[i] = indexName;
|
||||
}
|
||||
assertThat(client().admin().cluster().prepareState().get().getState().metadata().indices().size(), equalTo(indices.length));
|
||||
|
||||
final List<Thread> threads = new ArrayList<>();
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
final APIBlock block = randomFrom(APIBlock.values());
|
||||
|
||||
Consumer<Exception> exceptionConsumer = t -> {
|
||||
Throwable cause = ExceptionsHelper.unwrapCause(t);
|
||||
if (cause instanceof ClusterBlockException) {
|
||||
ClusterBlockException e = (ClusterBlockException) cause;
|
||||
assertThat(e.blocks(), hasSize(1));
|
||||
assertTrue(e.blocks().stream().allMatch(b -> b.id() == block.getBlock().id()));
|
||||
} else {
|
||||
assertThat(cause, instanceOf(IndexNotFoundException.class));
|
||||
}
|
||||
};
|
||||
|
||||
try {
|
||||
for (final String indexToDelete : indices) {
|
||||
threads.add(new Thread(() -> {
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
try {
|
||||
assertAcked(client().admin().indices().prepareDelete(indexToDelete));
|
||||
} catch (final Exception e) {
|
||||
exceptionConsumer.accept(e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
for (final String indexToBlock : indices) {
|
||||
threads.add(new Thread(() -> {
|
||||
try {
|
||||
latch.await();
|
||||
} catch (InterruptedException e) {
|
||||
throw new AssertionError(e);
|
||||
}
|
||||
try {
|
||||
client().admin().indices().prepareAddBlock(block, indexToBlock).get();
|
||||
} catch (final Exception e) {
|
||||
exceptionConsumer.accept(e);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
for (Thread thread : threads) {
|
||||
thread.start();
|
||||
}
|
||||
latch.countDown();
|
||||
for (Thread thread : threads) {
|
||||
thread.join();
|
||||
}
|
||||
} finally {
|
||||
for (final String indexToBlock : indices) {
|
||||
try {
|
||||
disableIndexBlock(indexToBlock, block);
|
||||
} catch (IndexNotFoundException infe) {
|
||||
// ignore
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static void assertIndexHasBlock(APIBlock block, final String... indices) {
|
||||
final ClusterState clusterState = client().admin().cluster().prepareState().get().getState();
|
||||
for (String index : indices) {
|
||||
final IndexMetadata indexMetadata = clusterState.metadata().indices().get(index);
|
||||
final Settings indexSettings = indexMetadata.getSettings();
|
||||
assertThat(indexSettings.hasValue(block.settingName()), is(true));
|
||||
assertThat(indexSettings.getAsBoolean(block.settingName(), false), is(true));
|
||||
assertThat(clusterState.blocks().hasIndexBlock(index, block.getBlock()), is(true));
|
||||
assertThat("Index " + index + " must have only 1 block with [id=" + block.getBlock().id() + "]",
|
||||
clusterState.blocks().indices().getOrDefault(index, emptySet()).stream()
|
||||
.filter(clusterBlock -> clusterBlock.id() == block.getBlock().id()).count(), equalTo(1L));
|
||||
}
|
||||
}
|
||||
|
||||
public static void disableIndexBlock(String index, APIBlock block) {
|
||||
disableIndexBlock(index, block.settingName());
|
||||
}
|
||||
}
|
||||
|
@ -142,6 +142,8 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
|
||||
import org.elasticsearch.action.admin.indices.mapping.put.TransportPutMappingAction;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.open.TransportOpenIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockAction;
|
||||
import org.elasticsearch.action.admin.indices.readonly.TransportAddIndexBlockAction;
|
||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
|
||||
import org.elasticsearch.action.admin.indices.recovery.TransportRecoveryAction;
|
||||
import org.elasticsearch.action.admin.indices.refresh.RefreshAction;
|
||||
@ -300,6 +302,7 @@ import org.elasticsearch.rest.action.admin.cluster.RestVerifyRepositoryAction;
|
||||
import org.elasticsearch.rest.action.admin.cluster.dangling.RestDeleteDanglingIndexAction;
|
||||
import org.elasticsearch.rest.action.admin.cluster.dangling.RestImportDanglingIndexAction;
|
||||
import org.elasticsearch.rest.action.admin.cluster.dangling.RestListDanglingIndicesAction;
|
||||
import org.elasticsearch.rest.action.admin.indices.RestAddIndexBlockAction;
|
||||
import org.elasticsearch.rest.action.admin.indices.RestAnalyzeAction;
|
||||
import org.elasticsearch.rest.action.admin.indices.RestClearIndicesCacheAction;
|
||||
import org.elasticsearch.rest.action.admin.indices.RestCloseIndexAction;
|
||||
@ -549,6 +552,7 @@ public class ActionModule extends AbstractModule {
|
||||
actions.register(CloseIndexAction.INSTANCE, TransportCloseIndexAction.class);
|
||||
actions.register(IndicesExistsAction.INSTANCE, TransportIndicesExistsAction.class);
|
||||
actions.register(TypesExistsAction.INSTANCE, TransportTypesExistsAction.class);
|
||||
actions.register(AddIndexBlockAction.INSTANCE, TransportAddIndexBlockAction.class);
|
||||
actions.register(GetMappingsAction.INSTANCE, TransportGetMappingsAction.class);
|
||||
actions.register(GetFieldMappingsAction.INSTANCE, TransportGetFieldMappingsAction.class,
|
||||
TransportGetFieldMappingsIndexAction.class);
|
||||
@ -701,6 +705,7 @@ public class ActionModule extends AbstractModule {
|
||||
registerHandler.accept(new RestDeleteIndexAction());
|
||||
registerHandler.accept(new RestCloseIndexAction());
|
||||
registerHandler.accept(new RestOpenIndexAction());
|
||||
registerHandler.accept(new RestAddIndexBlockAction());
|
||||
|
||||
registerHandler.accept(new RestUpdateSettingsAction());
|
||||
registerHandler.accept(new RestGetSettingsAction());
|
||||
|
@ -0,0 +1,32 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.readonly;
|
||||
|
||||
import org.elasticsearch.action.ActionType;
|
||||
|
||||
public class AddIndexBlockAction extends ActionType<AddIndexBlockResponse> {
|
||||
|
||||
public static final AddIndexBlockAction INSTANCE = new AddIndexBlockAction();
|
||||
public static final String NAME = "indices:admin/block/add";
|
||||
|
||||
private AddIndexBlockAction() {
|
||||
super(NAME, AddIndexBlockResponse::new);
|
||||
}
|
||||
}
|
@ -0,0 +1,49 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.indices.readonly;
|
||||
|
||||
import org.elasticsearch.cluster.ack.IndicesClusterStateUpdateRequest;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock;
|
||||
|
||||
/**
|
||||
* Cluster state update request that allows to add a block to one or more indices
|
||||
*/
|
||||
public class AddIndexBlockClusterStateUpdateRequest extends IndicesClusterStateUpdateRequest<AddIndexBlockClusterStateUpdateRequest> {
|
||||
|
||||
private final APIBlock block;
|
||||
private long taskId;
|
||||
|
||||
public AddIndexBlockClusterStateUpdateRequest(final APIBlock block, final long taskId) {
|
||||
this.block = block;
|
||||
this.taskId = taskId;
|
||||
}
|
||||
|
||||
public long taskId() {
|
||||
return taskId;
|
||||
}
|
||||
|
||||
public APIBlock getBlock() {
|
||||
return block;
|
||||
}
|
||||
|
||||
public AddIndexBlockClusterStateUpdateRequest taskId(final long taskId) {
|
||||
this.taskId = taskId;
|
||||
return this;
|
||||
}
|
||||
}
|
@ -0,0 +1,125 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.readonly;
|
||||
|
||||
import org.elasticsearch.action.ActionRequestValidationException;
|
||||
import org.elasticsearch.action.IndicesRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequest;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.util.CollectionUtils;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.elasticsearch.action.ValidateActions.addValidationError;
|
||||
|
||||
/**
|
||||
* A request to add a block to an index.
|
||||
*/
|
||||
public class AddIndexBlockRequest extends AcknowledgedRequest<AddIndexBlockRequest> implements IndicesRequest.Replaceable {
|
||||
|
||||
private final APIBlock block;
|
||||
private String[] indices;
|
||||
private IndicesOptions indicesOptions = IndicesOptions.strictExpandOpen();
|
||||
|
||||
public AddIndexBlockRequest(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
indices = in.readStringArray();
|
||||
indicesOptions = IndicesOptions.readIndicesOptions(in);
|
||||
block = APIBlock.readFrom(in);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructs a new request for the specified block and indices
|
||||
*/
|
||||
public AddIndexBlockRequest(APIBlock block, String... indices) {
|
||||
this.block = Objects.requireNonNull(block);
|
||||
this.indices = Objects.requireNonNull(indices);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ActionRequestValidationException validate() {
|
||||
ActionRequestValidationException validationException = null;
|
||||
if (CollectionUtils.isEmpty(indices)) {
|
||||
validationException = addValidationError("index is missing", validationException);
|
||||
}
|
||||
return validationException;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the indices to be blocked
|
||||
*/
|
||||
@Override
|
||||
public String[] indices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the indices to be blocked
|
||||
* @param indices the indices to be blocked
|
||||
* @return the request itself
|
||||
*/
|
||||
@Override
|
||||
public AddIndexBlockRequest indices(String... indices) {
|
||||
this.indices = Objects.requireNonNull(indices);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies what type of requested indices to ignore and how to deal with wildcard expressions.
|
||||
* For example indices that don't exist.
|
||||
*
|
||||
* @return the desired behaviour regarding indices to ignore and wildcard indices expressions
|
||||
*/
|
||||
@Override
|
||||
public IndicesOptions indicesOptions() {
|
||||
return indicesOptions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies what type of requested indices to ignore and how to deal wild wildcard expressions.
|
||||
* For example indices that don't exist.
|
||||
*
|
||||
* @param indicesOptions the desired behaviour regarding indices to ignore and wildcard indices expressions
|
||||
* @return the request itself
|
||||
*/
|
||||
public AddIndexBlockRequest indicesOptions(IndicesOptions indicesOptions) {
|
||||
this.indicesOptions = indicesOptions;
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the block to be added
|
||||
*/
|
||||
public APIBlock getBlock() {
|
||||
return block;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeStringArray(indices);
|
||||
indicesOptions.writeIndicesOptions(out);
|
||||
block.writeTo(out);
|
||||
}
|
||||
}
|
@ -0,0 +1,59 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.readonly;
|
||||
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedRequestBuilder;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock;
|
||||
|
||||
/**
|
||||
* Builder for add index block request
|
||||
*/
|
||||
public class AddIndexBlockRequestBuilder
|
||||
extends AcknowledgedRequestBuilder<AddIndexBlockRequest, AddIndexBlockResponse, AddIndexBlockRequestBuilder> {
|
||||
|
||||
public AddIndexBlockRequestBuilder(ElasticsearchClient client, AddIndexBlockAction action, APIBlock block, String... indices) {
|
||||
super(client, action, new AddIndexBlockRequest(block, indices));
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the indices to be blocked
|
||||
*
|
||||
* @param indices the indices to be blocked
|
||||
* @return the request itself
|
||||
*/
|
||||
public AddIndexBlockRequestBuilder setIndices(String... indices) {
|
||||
request.indices(indices);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Specifies what type of requested indices to ignore and wildcard indices expressions
|
||||
* For example indices that don't exist.
|
||||
*
|
||||
* @param indicesOptions the desired behaviour regarding indices to ignore and indices wildcard expressions
|
||||
* @return the request itself
|
||||
*/
|
||||
public AddIndexBlockRequestBuilder setIndicesOptions(IndicesOptions indicesOptions) {
|
||||
request.indicesOptions(indicesOptions);
|
||||
return this;
|
||||
}
|
||||
}
|
@ -0,0 +1,275 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.indices.readonly;
|
||||
|
||||
import org.elasticsearch.ElasticsearchException;
|
||||
import org.elasticsearch.action.support.DefaultShardOperationFailedException;
|
||||
import org.elasticsearch.action.support.master.ShardsAcknowledgedResponse;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.util.CollectionUtils;
|
||||
import org.elasticsearch.common.xcontent.ToXContentFragment;
|
||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||
import org.elasticsearch.index.Index;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import static java.util.Collections.unmodifiableList;
|
||||
|
||||
public class AddIndexBlockResponse extends ShardsAcknowledgedResponse {
|
||||
|
||||
private final List<AddBlockResult> indices;
|
||||
|
||||
AddIndexBlockResponse(StreamInput in) throws IOException {
|
||||
super(in, true,true);
|
||||
indices = unmodifiableList(in.readList(AddBlockResult::new));
|
||||
}
|
||||
|
||||
public AddIndexBlockResponse(final boolean acknowledged, final boolean shardsAcknowledged, final List<AddBlockResult> indices) {
|
||||
super(acknowledged, shardsAcknowledged);
|
||||
this.indices = unmodifiableList(Objects.requireNonNull(indices));
|
||||
}
|
||||
|
||||
public List<AddBlockResult> getIndices() {
|
||||
return indices;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
writeShardsAcknowledged(out);
|
||||
out.writeList(indices);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void addCustomFields(final XContentBuilder builder, final Params params) throws IOException {
|
||||
super.addCustomFields(builder, params);
|
||||
builder.startArray("indices");
|
||||
for (AddBlockResult index : indices) {
|
||||
index.toXContent(builder, params);
|
||||
}
|
||||
builder.endArray();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
|
||||
public static class AddBlockResult implements Writeable, ToXContentFragment {
|
||||
|
||||
private final Index index;
|
||||
private final @Nullable Exception exception;
|
||||
private final @Nullable AddBlockShardResult[] shards;
|
||||
|
||||
public AddBlockResult(final Index index) {
|
||||
this(index, null, null);
|
||||
}
|
||||
|
||||
public AddBlockResult(final Index index, final Exception failure) {
|
||||
this(index, Objects.requireNonNull(failure), null);
|
||||
}
|
||||
|
||||
public AddBlockResult(final Index index, final AddBlockShardResult[] shards) {
|
||||
this(index, null, Objects.requireNonNull(shards));
|
||||
}
|
||||
|
||||
private AddBlockResult(final Index index, @Nullable final Exception exception, @Nullable final AddBlockShardResult[] shards) {
|
||||
this.index = Objects.requireNonNull(index);
|
||||
this.exception = exception;
|
||||
this.shards = shards;
|
||||
}
|
||||
|
||||
AddBlockResult(final StreamInput in) throws IOException {
|
||||
this.index = new Index(in);
|
||||
this.exception = in.readException();
|
||||
this.shards = in.readOptionalArray(AddBlockShardResult::new, AddBlockShardResult[]::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(final StreamOutput out) throws IOException {
|
||||
index.writeTo(out);
|
||||
out.writeException(exception);
|
||||
out.writeOptionalArray(shards);
|
||||
}
|
||||
|
||||
public Index getIndex() {
|
||||
return index;
|
||||
}
|
||||
|
||||
public Exception getException() {
|
||||
return exception;
|
||||
}
|
||||
|
||||
public AddBlockShardResult[] getShards() {
|
||||
return shards;
|
||||
}
|
||||
|
||||
public boolean hasFailures() {
|
||||
if (exception != null) {
|
||||
return true;
|
||||
}
|
||||
if (shards != null) {
|
||||
for (AddBlockShardResult shard : shards) {
|
||||
if (shard.hasFailures()) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
|
||||
builder.startObject();
|
||||
{
|
||||
builder.field("name", index.getName());
|
||||
if (hasFailures()) {
|
||||
if (exception != null) {
|
||||
builder.startObject("exception");
|
||||
ElasticsearchException.generateFailureXContent(builder, params, exception, true);
|
||||
builder.endObject();
|
||||
} else {
|
||||
builder.startArray("failed_shards");
|
||||
for (AddBlockShardResult shard : shards) {
|
||||
if (shard.hasFailures()) {
|
||||
shard.toXContent(builder, params);
|
||||
}
|
||||
}
|
||||
builder.endArray();
|
||||
}
|
||||
} else {
|
||||
builder.field("blocked", true);
|
||||
}
|
||||
}
|
||||
return builder.endObject();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
}
|
||||
|
||||
public static class AddBlockShardResult implements Writeable, ToXContentFragment {
|
||||
|
||||
private final int id;
|
||||
private final Failure[] failures;
|
||||
|
||||
public AddBlockShardResult(final int id, final Failure[] failures) {
|
||||
this.id = id;
|
||||
this.failures = failures;
|
||||
}
|
||||
|
||||
AddBlockShardResult(final StreamInput in) throws IOException {
|
||||
this.id = in.readVInt();
|
||||
this.failures = in.readOptionalArray(Failure::readFailure, Failure[]::new);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(final StreamOutput out) throws IOException {
|
||||
out.writeVInt(id);
|
||||
out.writeOptionalArray(failures);
|
||||
}
|
||||
|
||||
public boolean hasFailures() {
|
||||
return CollectionUtils.isEmpty(failures) == false;
|
||||
}
|
||||
|
||||
public int getId() {
|
||||
return id;
|
||||
}
|
||||
|
||||
public Failure[] getFailures() {
|
||||
return failures;
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder toXContent(final XContentBuilder builder, final Params params) throws IOException {
|
||||
builder.startObject();
|
||||
{
|
||||
builder.field("id", String.valueOf(id));
|
||||
builder.startArray("failures");
|
||||
if (failures != null) {
|
||||
for (Failure failure : failures) {
|
||||
failure.toXContent(builder, params);
|
||||
}
|
||||
}
|
||||
builder.endArray();
|
||||
}
|
||||
return builder.endObject();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
|
||||
public static class Failure extends DefaultShardOperationFailedException {
|
||||
|
||||
private @Nullable String nodeId;
|
||||
|
||||
private Failure(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
nodeId = in.readOptionalString();
|
||||
}
|
||||
|
||||
public Failure(final String index, final int shardId, final Throwable reason) {
|
||||
this(index, shardId, reason, null);
|
||||
}
|
||||
|
||||
public Failure(final String index, final int shardId, final Throwable reason, final String nodeId) {
|
||||
super(index, shardId, reason);
|
||||
this.nodeId = nodeId;
|
||||
}
|
||||
|
||||
public String getNodeId() {
|
||||
return nodeId;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(final StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
out.writeOptionalString(nodeId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public XContentBuilder innerToXContent(final XContentBuilder builder, final Params params) throws IOException {
|
||||
if (nodeId != null) {
|
||||
builder.field("node", nodeId);
|
||||
}
|
||||
return super.innerToXContent(builder, params);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Strings.toString(this);
|
||||
}
|
||||
|
||||
static Failure readFailure(final StreamInput in) throws IOException {
|
||||
return new Failure(in);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,125 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.action.admin.indices.readonly;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.DestructiveOperations;
|
||||
import org.elasticsearch.action.support.master.TransportMasterNodeAction;
|
||||
import org.elasticsearch.cluster.ClusterState;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockException;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
|
||||
import org.elasticsearch.cluster.metadata.MetadataIndexStateService;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.index.Index;
|
||||
import org.elasticsearch.tasks.Task;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
|
||||
/**
|
||||
* Adds a single index level block to a given set of indices. Not only does it set the correct setting,
|
||||
* but it ensures that, in case of a write block, once successfully returning to the user, all shards
|
||||
* of the index are properly accounting for the block, for instance, when adding a write block all
|
||||
* in-flight writes to an index have been completed prior to the response being returned. These actions
|
||||
* are done in multiple cluster state updates (at least two). See also {@link TransportVerifyShardIndexBlockAction}
|
||||
* for the eventual delegation for shard-level verification.
|
||||
*/
|
||||
public class TransportAddIndexBlockAction extends TransportMasterNodeAction<AddIndexBlockRequest, AddIndexBlockResponse> {
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(TransportAddIndexBlockAction.class);
|
||||
|
||||
private final MetadataIndexStateService indexStateService;
|
||||
private final DestructiveOperations destructiveOperations;
|
||||
|
||||
@Inject
|
||||
public TransportAddIndexBlockAction(TransportService transportService, ClusterService clusterService,
|
||||
ThreadPool threadPool, MetadataIndexStateService indexStateService,
|
||||
ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver,
|
||||
DestructiveOperations destructiveOperations) {
|
||||
super(AddIndexBlockAction.NAME, transportService, clusterService, threadPool, actionFilters, AddIndexBlockRequest::new,
|
||||
indexNameExpressionResolver);
|
||||
this.indexStateService = indexStateService;
|
||||
this.destructiveOperations = destructiveOperations;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected String executor() {
|
||||
// no need to use a thread pool, we go async right away
|
||||
return ThreadPool.Names.SAME;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected AddIndexBlockResponse read(StreamInput in) throws IOException {
|
||||
return new AddIndexBlockResponse(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doExecute(Task task, AddIndexBlockRequest request, ActionListener<AddIndexBlockResponse> listener) {
|
||||
destructiveOperations.failDestructive(request.indices());
|
||||
super.doExecute(task, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ClusterBlockException checkBlock(AddIndexBlockRequest request, ClusterState state) {
|
||||
if (request.getBlock().getBlock().levels().contains(ClusterBlockLevel.METADATA_WRITE) &&
|
||||
state.blocks().global(ClusterBlockLevel.METADATA_WRITE).isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
return state.blocks().indicesBlockedException(ClusterBlockLevel.METADATA_WRITE,
|
||||
indexNameExpressionResolver.concreteIndexNames(state, request));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(AddIndexBlockRequest request, ClusterState state,
|
||||
ActionListener<AddIndexBlockResponse> listener) throws Exception {
|
||||
throw new UnsupportedOperationException("The task parameter is required");
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void masterOperation(final Task task,
|
||||
final AddIndexBlockRequest request,
|
||||
final ClusterState state,
|
||||
final ActionListener<AddIndexBlockResponse> listener) throws Exception {
|
||||
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request);
|
||||
if (concreteIndices == null || concreteIndices.length == 0) {
|
||||
listener.onResponse(new AddIndexBlockResponse(true, false, Collections.emptyList()));
|
||||
return;
|
||||
}
|
||||
|
||||
final AddIndexBlockClusterStateUpdateRequest addBlockRequest = new AddIndexBlockClusterStateUpdateRequest(request.getBlock(),
|
||||
task.getId())
|
||||
.ackTimeout(request.timeout())
|
||||
.masterNodeTimeout(request.masterNodeTimeout())
|
||||
.indices(concreteIndices);
|
||||
indexStateService.addIndexBlock(addBlockRequest, ActionListener.delegateResponse(listener, (delegatedListener, t) -> {
|
||||
logger.debug(() -> new ParameterizedMessage("failed to mark indices as readonly [{}]", (Object) concreteIndices), t);
|
||||
delegatedListener.onFailure(t);
|
||||
}));
|
||||
}
|
||||
}
|
@ -0,0 +1,169 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
package org.elasticsearch.action.admin.indices.readonly;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.ActionType;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
import org.elasticsearch.action.support.replication.ReplicationOperation;
|
||||
import org.elasticsearch.action.support.replication.ReplicationRequest;
|
||||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
import org.elasticsearch.action.support.replication.TransportReplicationAction;
|
||||
import org.elasticsearch.cluster.action.shard.ShardStateAction;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.service.ClusterService;
|
||||
import org.elasticsearch.common.inject.Inject;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.lease.Releasable;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
import org.elasticsearch.index.shard.IndexShard;
|
||||
import org.elasticsearch.index.shard.ShardId;
|
||||
import org.elasticsearch.indices.IndicesService;
|
||||
import org.elasticsearch.tasks.TaskId;
|
||||
import org.elasticsearch.threadpool.ThreadPool;
|
||||
import org.elasticsearch.transport.TransportService;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
|
||||
/**
|
||||
* Action used to verify whether shards have properly applied a given index block,
|
||||
* and are no longer executing any operations in violation of that block. This action
|
||||
* requests all operation permits of the shard in order to wait for all write operations
|
||||
* to complete.
|
||||
*/
|
||||
public class TransportVerifyShardIndexBlockAction extends TransportReplicationAction<
|
||||
TransportVerifyShardIndexBlockAction.ShardRequest, TransportVerifyShardIndexBlockAction.ShardRequest, ReplicationResponse> {
|
||||
|
||||
public static final String NAME = AddIndexBlockAction.NAME + "[s]";
|
||||
public static final ActionType<ReplicationResponse> TYPE = new ActionType<>(NAME, ReplicationResponse::new);
|
||||
protected Logger logger = LogManager.getLogger(getClass());
|
||||
|
||||
@Inject
|
||||
public TransportVerifyShardIndexBlockAction(final Settings settings, final TransportService transportService,
|
||||
final ClusterService clusterService, final IndicesService indicesService,
|
||||
final ThreadPool threadPool, final ShardStateAction stateAction,
|
||||
final ActionFilters actionFilters) {
|
||||
super(settings, NAME, transportService, clusterService, indicesService, threadPool, stateAction, actionFilters,
|
||||
ShardRequest::new, ShardRequest::new, ThreadPool.Names.MANAGEMENT);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReplicationResponse newResponseInstance(StreamInput in) throws IOException {
|
||||
return new ReplicationResponse(in);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void acquirePrimaryOperationPermit(final IndexShard primary,
|
||||
final ShardRequest request,
|
||||
final ActionListener<Releasable> onAcquired) {
|
||||
primary.acquireAllPrimaryOperationsPermits(onAcquired, request.timeout());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void acquireReplicaOperationPermit(final IndexShard replica,
|
||||
final ShardRequest request,
|
||||
final ActionListener<Releasable> onAcquired,
|
||||
final long primaryTerm,
|
||||
final long globalCheckpoint,
|
||||
final long maxSeqNoOfUpdateOrDeletes) {
|
||||
replica.acquireAllReplicaOperationsPermits(primaryTerm, globalCheckpoint, maxSeqNoOfUpdateOrDeletes, onAcquired, request.timeout());
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void shardOperationOnPrimary(final ShardRequest shardRequest, final IndexShard primary,
|
||||
ActionListener<PrimaryResult<ShardRequest, ReplicationResponse>> listener) {
|
||||
ActionListener.completeWith(listener, () -> {
|
||||
executeShardOperation(shardRequest, primary);
|
||||
return new PrimaryResult<>(shardRequest, new ReplicationResponse());
|
||||
});
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReplicaResult shardOperationOnReplica(ShardRequest shardRequest, IndexShard replica) {
|
||||
executeShardOperation(shardRequest, replica);
|
||||
return new ReplicaResult();
|
||||
}
|
||||
|
||||
private void executeShardOperation(final ShardRequest request, final IndexShard indexShard) {
|
||||
final ShardId shardId = indexShard.shardId();
|
||||
if (indexShard.getActiveOperationsCount() != IndexShard.OPERATIONS_BLOCKED) {
|
||||
throw new IllegalStateException("index shard " + shardId +
|
||||
" is not blocking all operations while waiting for block " + request.clusterBlock());
|
||||
}
|
||||
|
||||
final ClusterBlocks clusterBlocks = clusterService.state().blocks();
|
||||
if (clusterBlocks.hasIndexBlock(shardId.getIndexName(), request.clusterBlock()) == false) {
|
||||
throw new IllegalStateException("index shard " + shardId + " has not applied block " + request.clusterBlock());
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ReplicationOperation.Replicas<ShardRequest> newReplicasProxy() {
|
||||
return new VerifyShardReadOnlyActionReplicasProxy();
|
||||
}
|
||||
|
||||
/**
|
||||
* A {@link ReplicasProxy} that marks as stale the shards that are unavailable during the verification
|
||||
* and the flush of the shard. This is done to ensure that such shards won't be later promoted as primary
|
||||
* or reopened in an unverified state with potential non flushed translog operations.
|
||||
*/
|
||||
class VerifyShardReadOnlyActionReplicasProxy extends ReplicasProxy {
|
||||
@Override
|
||||
public void markShardCopyAsStaleIfNeeded(final ShardId shardId, final String allocationId, final long primaryTerm,
|
||||
final ActionListener<Void> listener) {
|
||||
shardStateAction.remoteShardFailed(shardId, allocationId, primaryTerm, true, "mark copy as stale", null, listener);
|
||||
}
|
||||
}
|
||||
|
||||
public static class ShardRequest extends ReplicationRequest<ShardRequest> {
|
||||
|
||||
private final ClusterBlock clusterBlock;
|
||||
|
||||
ShardRequest(StreamInput in) throws IOException {
|
||||
super(in);
|
||||
clusterBlock = new ClusterBlock(in);
|
||||
}
|
||||
|
||||
public ShardRequest(final ShardId shardId, final ClusterBlock clusterBlock, final TaskId parentTaskId) {
|
||||
super(shardId);
|
||||
this.clusterBlock = Objects.requireNonNull(clusterBlock);
|
||||
setParentTask(parentTaskId);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "verify shard " + shardId + " before block with " + clusterBlock;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(final StreamOutput out) throws IOException {
|
||||
super.writeTo(out);
|
||||
clusterBlock.writeTo(out);
|
||||
}
|
||||
|
||||
public ClusterBlock clusterBlock() {
|
||||
return clusterBlock;
|
||||
}
|
||||
}
|
||||
}
|
@ -21,9 +21,6 @@ package org.elasticsearch.client;
|
||||
|
||||
import org.elasticsearch.action.ActionFuture;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
|
||||
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction;
|
||||
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
|
||||
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.alias.exists.AliasesExistRequestBuilder;
|
||||
@ -42,6 +39,9 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
|
||||
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
|
||||
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
|
||||
@ -73,6 +73,9 @@ import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequestBuild
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
|
||||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse;
|
||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
|
||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryResponse;
|
||||
@ -117,6 +120,7 @@ import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryReques
|
||||
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.validate.query.ValidateQueryResponse;
|
||||
import org.elasticsearch.action.support.master.AcknowledgedResponse;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
|
||||
/**
|
||||
@ -347,6 +351,23 @@ public interface IndicesAdminClient extends ElasticsearchClient {
|
||||
*/
|
||||
void open(OpenIndexRequest request, ActionListener<OpenIndexResponse> listener);
|
||||
|
||||
/**
|
||||
* Adds a block to an index
|
||||
*
|
||||
* @param block The block to add
|
||||
* @param indices The name of the indices to add the block to
|
||||
*/
|
||||
AddIndexBlockRequestBuilder prepareAddBlock(APIBlock block, String... indices);
|
||||
|
||||
/**
|
||||
* Adds a block to an index
|
||||
*
|
||||
* @param request The add index block request
|
||||
* @param listener A listener to be notified with a result
|
||||
* @see org.elasticsearch.client.Requests#openIndexRequest(String)
|
||||
*/
|
||||
void addBlock(AddIndexBlockRequest request, ActionListener<AddIndexBlockResponse> listener);
|
||||
|
||||
/**
|
||||
* Opens one or more indices based on their index name.
|
||||
*
|
||||
|
@ -30,9 +30,6 @@ import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplai
|
||||
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequest;
|
||||
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainRequestBuilder;
|
||||
import org.elasticsearch.action.admin.cluster.allocation.ClusterAllocationExplainResponse;
|
||||
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
|
||||
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction;
|
||||
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthAction;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequest;
|
||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthRequestBuilder;
|
||||
@ -169,6 +166,9 @@ import org.elasticsearch.action.admin.indices.dangling.import_index.ImportDangli
|
||||
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesAction;
|
||||
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesRequest;
|
||||
import org.elasticsearch.action.admin.indices.dangling.list.ListDanglingIndicesResponse;
|
||||
import org.elasticsearch.action.admin.indices.datastream.CreateDataStreamAction;
|
||||
import org.elasticsearch.action.admin.indices.datastream.DeleteDataStreamAction;
|
||||
import org.elasticsearch.action.admin.indices.datastream.GetDataStreamAction;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequestBuilder;
|
||||
@ -211,6 +211,10 @@ import org.elasticsearch.action.admin.indices.open.OpenIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexResponse;
|
||||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockAction;
|
||||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
|
||||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequestBuilder;
|
||||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse;
|
||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryAction;
|
||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequest;
|
||||
import org.elasticsearch.action.admin.indices.recovery.RecoveryRequestBuilder;
|
||||
@ -347,6 +351,7 @@ import org.elasticsearch.client.ClusterAdminClient;
|
||||
import org.elasticsearch.client.ElasticsearchClient;
|
||||
import org.elasticsearch.client.FilterClient;
|
||||
import org.elasticsearch.client.IndicesAdminClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock;
|
||||
import org.elasticsearch.common.Nullable;
|
||||
import org.elasticsearch.common.bytes.BytesReference;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -1438,6 +1443,16 @@ public abstract class AbstractClient implements Client {
|
||||
execute(OpenIndexAction.INSTANCE, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AddIndexBlockRequestBuilder prepareAddBlock(APIBlock block, String... indices) {
|
||||
return new AddIndexBlockRequestBuilder(this, AddIndexBlockAction.INSTANCE, block, indices);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addBlock(AddIndexBlockRequest request, ActionListener<AddIndexBlockResponse> listener) {
|
||||
execute(AddIndexBlockAction.INSTANCE, request, listener);
|
||||
}
|
||||
|
||||
@Override
|
||||
public OpenIndexRequestBuilder prepareOpen(String... indices) {
|
||||
return new OpenIndexRequestBuilder(this, OpenIndexAction.INSTANCE, indices);
|
||||
|
@ -86,6 +86,7 @@ public class ClusterBlock implements Writeable, ToXContentFragment {
|
||||
return this.id;
|
||||
}
|
||||
|
||||
@Nullable
|
||||
public String uuid() {
|
||||
return uuid;
|
||||
}
|
||||
|
@ -42,6 +42,7 @@ import org.elasticsearch.common.collect.MapBuilder;
|
||||
import org.elasticsearch.common.compress.CompressedXContent;
|
||||
import org.elasticsearch.common.io.stream.StreamInput;
|
||||
import org.elasticsearch.common.io.stream.StreamOutput;
|
||||
import org.elasticsearch.common.io.stream.Writeable;
|
||||
import org.elasticsearch.common.settings.Setting;
|
||||
import org.elasticsearch.common.settings.Setting.Property;
|
||||
import org.elasticsearch.common.settings.Settings;
|
||||
@ -188,25 +189,80 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
|
||||
|
||||
public static final String SETTING_AUTO_EXPAND_REPLICAS = "index.auto_expand_replicas";
|
||||
public static final Setting<AutoExpandReplicas> INDEX_AUTO_EXPAND_REPLICAS_SETTING = AutoExpandReplicas.SETTING;
|
||||
public static final String SETTING_READ_ONLY = "index.blocks.read_only";
|
||||
public static final Setting<Boolean> INDEX_READ_ONLY_SETTING =
|
||||
Setting.boolSetting(SETTING_READ_ONLY, false, Property.Dynamic, Property.IndexScope);
|
||||
|
||||
public static final String SETTING_BLOCKS_READ = "index.blocks.read";
|
||||
public static final Setting<Boolean> INDEX_BLOCKS_READ_SETTING =
|
||||
Setting.boolSetting(SETTING_BLOCKS_READ, false, Property.Dynamic, Property.IndexScope);
|
||||
public enum APIBlock implements Writeable {
|
||||
READ_ONLY("read_only", INDEX_READ_ONLY_BLOCK),
|
||||
READ("read", INDEX_READ_BLOCK),
|
||||
WRITE("write", INDEX_WRITE_BLOCK),
|
||||
METADATA("metadata", INDEX_METADATA_BLOCK),
|
||||
READ_ONLY_ALLOW_DELETE("read_only_allow_delete", INDEX_READ_ONLY_ALLOW_DELETE_BLOCK);
|
||||
|
||||
public static final String SETTING_BLOCKS_WRITE = "index.blocks.write";
|
||||
public static final Setting<Boolean> INDEX_BLOCKS_WRITE_SETTING =
|
||||
Setting.boolSetting(SETTING_BLOCKS_WRITE, false, Property.Dynamic, Property.IndexScope);
|
||||
final String name;
|
||||
final String settingName;
|
||||
final Setting<Boolean> setting;
|
||||
final ClusterBlock block;
|
||||
|
||||
public static final String SETTING_BLOCKS_METADATA = "index.blocks.metadata";
|
||||
public static final Setting<Boolean> INDEX_BLOCKS_METADATA_SETTING =
|
||||
Setting.boolSetting(SETTING_BLOCKS_METADATA, false, Property.Dynamic, Property.IndexScope);
|
||||
APIBlock(String name, ClusterBlock block) {
|
||||
this.name = name;
|
||||
this.settingName = "index.blocks." + name;
|
||||
this.setting = Setting.boolSetting(settingName, false, Property.Dynamic, Property.IndexScope);
|
||||
this.block = block;
|
||||
}
|
||||
|
||||
public static final String SETTING_READ_ONLY_ALLOW_DELETE = "index.blocks.read_only_allow_delete";
|
||||
public static final Setting<Boolean> INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING =
|
||||
Setting.boolSetting(SETTING_READ_ONLY_ALLOW_DELETE, false, Property.Dynamic, Property.IndexScope);
|
||||
public String settingName() {
|
||||
return settingName;
|
||||
}
|
||||
|
||||
public Setting<Boolean> setting() {
|
||||
return setting;
|
||||
}
|
||||
|
||||
public ClusterBlock getBlock() {
|
||||
return block;
|
||||
}
|
||||
|
||||
public static APIBlock fromName(String name) {
|
||||
for (APIBlock block : APIBlock.values()) {
|
||||
if (block.name.equals(name)) {
|
||||
return block;
|
||||
}
|
||||
}
|
||||
throw new IllegalArgumentException("No block found with name " + name);
|
||||
}
|
||||
|
||||
public static APIBlock fromSetting(String settingName) {
|
||||
for (APIBlock block : APIBlock.values()) {
|
||||
if (block.settingName.equals(settingName)) {
|
||||
return block;
|
||||
}
|
||||
}
|
||||
throw new IllegalArgumentException("No block found with setting name " + settingName);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeTo(StreamOutput out) throws IOException {
|
||||
out.writeVInt(ordinal());
|
||||
}
|
||||
|
||||
public static APIBlock readFrom(StreamInput input) throws IOException {
|
||||
return APIBlock.values()[input.readVInt()];
|
||||
}
|
||||
}
|
||||
|
||||
public static final String SETTING_READ_ONLY = APIBlock.READ_ONLY.settingName();
|
||||
public static final Setting<Boolean> INDEX_READ_ONLY_SETTING = APIBlock.READ_ONLY.setting();
|
||||
|
||||
public static final String SETTING_BLOCKS_READ = APIBlock.READ.settingName();
|
||||
public static final Setting<Boolean> INDEX_BLOCKS_READ_SETTING = APIBlock.READ.setting();
|
||||
|
||||
public static final String SETTING_BLOCKS_WRITE = APIBlock.WRITE.settingName();
|
||||
public static final Setting<Boolean> INDEX_BLOCKS_WRITE_SETTING = APIBlock.WRITE.setting();
|
||||
|
||||
public static final String SETTING_BLOCKS_METADATA = APIBlock.METADATA.settingName();
|
||||
public static final Setting<Boolean> INDEX_BLOCKS_METADATA_SETTING = APIBlock.METADATA.setting();
|
||||
|
||||
public static final String SETTING_READ_ONLY_ALLOW_DELETE = APIBlock.READ_ONLY_ALLOW_DELETE.settingName();
|
||||
public static final Setting<Boolean> INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING = APIBlock.READ_ONLY_ALLOW_DELETE.setting();
|
||||
|
||||
public static final String SETTING_VERSION_CREATED = "index.version.created";
|
||||
|
||||
|
@ -34,6 +34,11 @@ import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.IndexResu
|
||||
import org.elasticsearch.action.admin.indices.close.CloseIndexResponse.ShardResult;
|
||||
import org.elasticsearch.action.admin.indices.close.TransportVerifyShardBeforeCloseAction;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexClusterStateUpdateRequest;
|
||||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockClusterStateUpdateRequest;
|
||||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse;
|
||||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse.AddBlockResult;
|
||||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockResponse.AddBlockShardResult;
|
||||
import org.elasticsearch.action.admin.indices.readonly.TransportVerifyShardIndexBlockAction;
|
||||
import org.elasticsearch.action.support.ActiveShardsObserver;
|
||||
import org.elasticsearch.action.support.replication.ReplicationResponse;
|
||||
import org.elasticsearch.cluster.AckedClusterStateUpdateTask;
|
||||
@ -44,6 +49,7 @@ import org.elasticsearch.cluster.ack.OpenIndexClusterStateUpdateResponse;
|
||||
import org.elasticsearch.cluster.block.ClusterBlock;
|
||||
import org.elasticsearch.cluster.block.ClusterBlockLevel;
|
||||
import org.elasticsearch.cluster.block.ClusterBlocks;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata.APIBlock;
|
||||
import org.elasticsearch.cluster.routing.IndexRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
|
||||
import org.elasticsearch.cluster.routing.RoutingTable;
|
||||
@ -91,7 +97,7 @@ import static java.util.Collections.singleton;
|
||||
import static java.util.Collections.unmodifiableMap;
|
||||
|
||||
/**
|
||||
* Service responsible for submitting open/close index requests
|
||||
* Service responsible for submitting open/close index requests as well as for adding index blocks
|
||||
*/
|
||||
public class MetadataIndexStateService {
|
||||
private static final Logger logger = LogManager.getLogger(MetadataIndexStateService.class);
|
||||
@ -109,18 +115,21 @@ public class MetadataIndexStateService {
|
||||
private final ShardLimitValidator shardLimitValidator;
|
||||
private final ThreadPool threadPool;
|
||||
private final TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction;
|
||||
private final TransportVerifyShardIndexBlockAction transportVerifyShardIndexBlockAction;
|
||||
private final ActiveShardsObserver activeShardsObserver;
|
||||
|
||||
@Inject
|
||||
public MetadataIndexStateService(ClusterService clusterService, AllocationService allocationService,
|
||||
MetadataIndexUpgradeService metadataIndexUpgradeService,
|
||||
IndicesService indicesService, ShardLimitValidator shardLimitValidator, ThreadPool threadPool,
|
||||
TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction) {
|
||||
TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction,
|
||||
TransportVerifyShardIndexBlockAction transportVerifyShardIndexBlockAction) {
|
||||
this.indicesService = indicesService;
|
||||
this.clusterService = clusterService;
|
||||
this.allocationService = allocationService;
|
||||
this.threadPool = threadPool;
|
||||
this.transportVerifyShardBeforeCloseAction = transportVerifyShardBeforeCloseAction;
|
||||
this.transportVerifyShardIndexBlockAction = transportVerifyShardIndexBlockAction;
|
||||
this.metadataIndexUpgradeService = metadataIndexUpgradeService;
|
||||
this.shardLimitValidator = shardLimitValidator;
|
||||
this.activeShardsObserver = new ActiveShardsObserver(clusterService, threadPool);
|
||||
@ -303,6 +312,176 @@ public class MetadataIndexStateService {
|
||||
return ClusterState.builder(currentState).blocks(blocks).metadata(metadata).routingTable(routingTable.build()).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Updates the cluster state for the given indices with the given index block,
|
||||
* and also returns the updated indices (and their blocks) in a map.
|
||||
* @param indices The indices to add blocks to if needed
|
||||
* @param currentState The current cluster state
|
||||
* @param block The type of block to add
|
||||
* @return a tuple of the updated cluster state, as well as the blocks that got added
|
||||
*/
|
||||
static Tuple<ClusterState, Map<Index, ClusterBlock>> addIndexBlock(final Index[] indices, final ClusterState currentState,
|
||||
final APIBlock block) {
|
||||
final Metadata.Builder metadata = Metadata.builder(currentState.metadata());
|
||||
|
||||
final Set<Index> indicesToAddBlock = new HashSet<>();
|
||||
for (Index index : indices) {
|
||||
metadata.getSafe(index); // to check if index exists
|
||||
if (currentState.blocks().hasIndexBlock(index.getName(), block.block)) {
|
||||
logger.debug("index {} already has block {}, ignoring", index, block.block);
|
||||
} else {
|
||||
indicesToAddBlock.add(index);
|
||||
}
|
||||
}
|
||||
|
||||
if (indicesToAddBlock.isEmpty()) {
|
||||
return Tuple.tuple(currentState, Collections.emptyMap());
|
||||
}
|
||||
|
||||
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
|
||||
final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
|
||||
final Map<Index, ClusterBlock> blockedIndices = new HashMap<>();
|
||||
|
||||
for (Index index : indicesToAddBlock) {
|
||||
ClusterBlock indexBlock = null;
|
||||
final Set<ClusterBlock> clusterBlocks = currentState.blocks().indices().get(index.getName());
|
||||
if (clusterBlocks != null) {
|
||||
for (ClusterBlock clusterBlock : clusterBlocks) {
|
||||
if (clusterBlock.id() == block.block.id()) {
|
||||
// Reuse the existing UUID-based block
|
||||
indexBlock = clusterBlock;
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
if (indexBlock == null) {
|
||||
// Create a new UUID-based block
|
||||
indexBlock = createUUIDBasedBlock(block.block);
|
||||
}
|
||||
assert Strings.hasLength(indexBlock.uuid()) : "Block should have a UUID";
|
||||
blocks.addIndexBlock(index.getName(), indexBlock);
|
||||
blockedIndices.put(index, indexBlock);
|
||||
// update index settings as well to match the block
|
||||
final IndexMetadata indexMetadata = metadata.getSafe(index);
|
||||
if (block.setting().get(indexMetadata.getSettings()) == false) {
|
||||
final Settings updatedSettings = Settings.builder()
|
||||
.put(indexMetadata.getSettings()).put(block.settingName(), true).build();
|
||||
|
||||
metadata.put(IndexMetadata.builder(indexMetadata)
|
||||
.settings(updatedSettings)
|
||||
.settingsVersion(indexMetadata.getSettingsVersion() + 1));
|
||||
}
|
||||
}
|
||||
|
||||
logger.info("adding block {} to indices {}", block.name,
|
||||
blockedIndices.keySet().stream().map(Object::toString).collect(Collectors.toList()));
|
||||
return Tuple.tuple(ClusterState.builder(currentState).blocks(blocks).metadata(metadata)
|
||||
.routingTable(routingTable.build()).build(), blockedIndices);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds an index block based on the given request, and notifies the listener upon completion.
|
||||
* Adding blocks is done in three steps:
|
||||
* - First, a temporary UUID-based block is added to the index
|
||||
* (see {@link #addIndexBlock(Index[], ClusterState, APIBlock)}.
|
||||
* - Second, shards are checked to have properly applied the UUID-based block.
|
||||
* (see {@link WaitForBlocksApplied}).
|
||||
* - Third, the temporary UUID-based block is turned into a full block
|
||||
* (see {@link #finalizeBlock(ClusterState, Map, Map, APIBlock)}.
|
||||
* Using this three-step process ensures non-interference by other operations in case where
|
||||
* we notify successful completion here.
|
||||
*/
|
||||
public void addIndexBlock(AddIndexBlockClusterStateUpdateRequest request,
|
||||
ActionListener<AddIndexBlockResponse> listener) {
|
||||
final Index[] concreteIndices = request.indices();
|
||||
if (concreteIndices == null || concreteIndices.length == 0) {
|
||||
throw new IllegalArgumentException("Index name is required");
|
||||
}
|
||||
List<String> writeIndices = new ArrayList<>();
|
||||
SortedMap<String, IndexAbstraction> lookup = clusterService.state().metadata().getIndicesLookup();
|
||||
for (Index index : concreteIndices) {
|
||||
IndexAbstraction ia = lookup.get(index.getName());
|
||||
if (ia != null && ia.getParentDataStream() != null && ia.getParentDataStream().getWriteIndex().getIndex().equals(index)) {
|
||||
writeIndices.add(index.getName());
|
||||
}
|
||||
}
|
||||
if (writeIndices.size() > 0) {
|
||||
throw new IllegalArgumentException("cannot add a block to the following data stream write indices [" +
|
||||
Strings.collectionToCommaDelimitedString(writeIndices) + "]");
|
||||
}
|
||||
|
||||
clusterService.submitStateUpdateTask("add-index-block-[" + request.getBlock().name + "]-" + Arrays.toString(concreteIndices),
|
||||
new ClusterStateUpdateTask(Priority.URGENT) {
|
||||
|
||||
private Map<Index, ClusterBlock> blockedIndices;
|
||||
|
||||
@Override
|
||||
public ClusterState execute(final ClusterState currentState) {
|
||||
final Tuple<ClusterState, Map<Index, ClusterBlock>> tup =
|
||||
addIndexBlock(concreteIndices, currentState, request.getBlock());
|
||||
blockedIndices = tup.v2();
|
||||
return tup.v1();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(final String source, final ClusterState oldState, final ClusterState newState) {
|
||||
if (oldState == newState) {
|
||||
assert blockedIndices.isEmpty() : "List of blocked indices is not empty but cluster state wasn't changed";
|
||||
listener.onResponse(new AddIndexBlockResponse(true, false, Collections.emptyList()));
|
||||
} else {
|
||||
assert blockedIndices.isEmpty() == false : "List of blocked indices is empty but cluster state was changed";
|
||||
threadPool.executor(ThreadPool.Names.MANAGEMENT)
|
||||
.execute(new WaitForBlocksApplied(blockedIndices, request,
|
||||
ActionListener.wrap(verifyResults ->
|
||||
clusterService.submitStateUpdateTask("finalize-index-block-[" + request.getBlock().name +
|
||||
"]-[" + blockedIndices.keySet().stream().map(Index::getName)
|
||||
.collect(Collectors.joining(", ")) + "]",
|
||||
new ClusterStateUpdateTask(Priority.URGENT) {
|
||||
private final List<AddBlockResult> indices = new ArrayList<>();
|
||||
|
||||
@Override
|
||||
public ClusterState execute(final ClusterState currentState) throws Exception {
|
||||
Tuple<ClusterState, Collection<AddBlockResult>> addBlockResult =
|
||||
finalizeBlock(currentState, blockedIndices, verifyResults, request.getBlock());
|
||||
assert verifyResults.size() == addBlockResult.v2().size();
|
||||
indices.addAll(addBlockResult.v2());
|
||||
return addBlockResult.v1();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final String source, final Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clusterStateProcessed(final String source,
|
||||
final ClusterState oldState,
|
||||
final ClusterState newState) {
|
||||
|
||||
final boolean acknowledged = indices.stream().noneMatch(
|
||||
AddBlockResult::hasFailures);
|
||||
listener.onResponse(new AddIndexBlockResponse(acknowledged, acknowledged, indices));
|
||||
}
|
||||
}),
|
||||
listener::onFailure)
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onFailure(final String source, final Exception e) {
|
||||
listener.onFailure(e);
|
||||
}
|
||||
|
||||
@Override
|
||||
public TimeValue timeout() {
|
||||
return request.masterNodeTimeout();
|
||||
}
|
||||
}
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Step 2 - Wait for indices to be ready for closing
|
||||
* <p>
|
||||
@ -428,6 +607,112 @@ public class MetadataIndexStateService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper class that coordinates with shards to ensure that blocks have been properly applied to all shards using
|
||||
* {@link TransportVerifyShardIndexBlockAction}.
|
||||
*/
|
||||
class WaitForBlocksApplied extends ActionRunnable<Map<Index, AddBlockResult>> {
|
||||
|
||||
private final Map<Index, ClusterBlock> blockedIndices;
|
||||
private final AddIndexBlockClusterStateUpdateRequest request;
|
||||
|
||||
private WaitForBlocksApplied(final Map<Index, ClusterBlock> blockedIndices,
|
||||
final AddIndexBlockClusterStateUpdateRequest request,
|
||||
final ActionListener<Map<Index, AddBlockResult>> listener) {
|
||||
super(listener);
|
||||
if (blockedIndices == null || blockedIndices.isEmpty()) {
|
||||
throw new IllegalArgumentException("Cannot wait for blocks to be applied, list of blocked indices is empty or null");
|
||||
}
|
||||
this.blockedIndices = blockedIndices;
|
||||
this.request = request;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doRun() throws Exception {
|
||||
final Map<Index, AddBlockResult> results = ConcurrentCollections.newConcurrentMap();
|
||||
final CountDown countDown = new CountDown(blockedIndices.size());
|
||||
final ClusterState state = clusterService.state();
|
||||
blockedIndices.forEach((index, block) -> {
|
||||
waitForShardsReady(index, block, state, response -> {
|
||||
results.put(index, response);
|
||||
if (countDown.countDown()) {
|
||||
listener.onResponse(unmodifiableMap(results));
|
||||
}
|
||||
});
|
||||
});
|
||||
}
|
||||
|
||||
private void waitForShardsReady(final Index index,
|
||||
final ClusterBlock clusterBlock,
|
||||
final ClusterState state,
|
||||
final Consumer<AddBlockResult> onResponse) {
|
||||
final IndexMetadata indexMetadata = state.metadata().index(index);
|
||||
if (indexMetadata == null) {
|
||||
logger.debug("index {} has since been deleted, ignoring", index);
|
||||
onResponse.accept(new AddBlockResult(index));
|
||||
return;
|
||||
}
|
||||
final IndexRoutingTable indexRoutingTable = state.routingTable().index(index);
|
||||
if (indexRoutingTable == null || indexMetadata.getState() == IndexMetadata.State.CLOSE) {
|
||||
logger.debug("index {} is closed, no need to wait for shards, ignoring", index);
|
||||
onResponse.accept(new AddBlockResult(index));
|
||||
return;
|
||||
}
|
||||
|
||||
final ImmutableOpenIntMap<IndexShardRoutingTable> shards = indexRoutingTable.getShards();
|
||||
final AtomicArray<AddBlockShardResult> results = new AtomicArray<>(shards.size());
|
||||
final CountDown countDown = new CountDown(shards.size());
|
||||
|
||||
for (IntObjectCursor<IndexShardRoutingTable> shard : shards) {
|
||||
final IndexShardRoutingTable shardRoutingTable = shard.value;
|
||||
final int shardId = shardRoutingTable.shardId().id();
|
||||
sendVerifyShardBlockRequest(shardRoutingTable, clusterBlock, new NotifyOnceListener<ReplicationResponse>() {
|
||||
@Override
|
||||
public void innerOnResponse(final ReplicationResponse replicationResponse) {
|
||||
AddBlockShardResult.Failure[] failures = Arrays.stream(replicationResponse.getShardInfo().getFailures())
|
||||
.map(f -> new AddBlockShardResult.Failure(f.index(), f.shardId(), f.getCause(), f.nodeId()))
|
||||
.toArray(AddBlockShardResult.Failure[]::new);
|
||||
results.setOnce(shardId, new AddBlockShardResult(shardId, failures));
|
||||
processIfFinished();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void innerOnFailure(final Exception e) {
|
||||
AddBlockShardResult.Failure failure = new AddBlockShardResult.Failure(index.getName(), shardId, e);
|
||||
results.setOnce(shardId, new AddBlockShardResult(shardId, new AddBlockShardResult.Failure[]{failure}));
|
||||
processIfFinished();
|
||||
}
|
||||
|
||||
private void processIfFinished() {
|
||||
if (countDown.countDown()) {
|
||||
onResponse.accept(new AddBlockResult(index, results.toArray(new AddBlockShardResult[results.length()])));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
private void sendVerifyShardBlockRequest(final IndexShardRoutingTable shardRoutingTable,
|
||||
final ClusterBlock block,
|
||||
final ActionListener<ReplicationResponse> listener) {
|
||||
final ShardId shardId = shardRoutingTable.shardId();
|
||||
if (shardRoutingTable.primaryShard().unassigned()) {
|
||||
logger.debug("primary shard {} is unassigned, ignoring", shardId);
|
||||
final ReplicationResponse response = new ReplicationResponse();
|
||||
response.setShardInfo(new ReplicationResponse.ShardInfo(shardRoutingTable.size(), shardRoutingTable.size()));
|
||||
listener.onResponse(response);
|
||||
return;
|
||||
}
|
||||
final TaskId parentTaskId = new TaskId(clusterService.localNode().getId(), request.taskId());
|
||||
final TransportVerifyShardIndexBlockAction.ShardRequest shardRequest =
|
||||
new TransportVerifyShardIndexBlockAction.ShardRequest(shardId, block, parentTaskId);
|
||||
if (request.ackTimeout() != null) {
|
||||
shardRequest.timeout(request.ackTimeout());
|
||||
}
|
||||
transportVerifyShardIndexBlockAction.execute(shardRequest, listener);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Step 3 - Move index states from OPEN to CLOSE in cluster state for indices that are ready for closing.
|
||||
*/
|
||||
@ -617,6 +902,68 @@ public class MetadataIndexStateService {
|
||||
return ClusterState.builder(updatedState).routingTable(routingTable.build()).build();
|
||||
}
|
||||
|
||||
/**
|
||||
* Finalizes the addition of blocks by turning the temporary UUID-based blocks into full blocks.
|
||||
* @param currentState the cluster state to update
|
||||
* @param blockedIndices the indices and their temporary UUID-based blocks to convert
|
||||
* @param verifyResult the index-level results for adding the block
|
||||
* @param block the full block to convert to
|
||||
* @return the updated cluster state, as well as the (failed and successful) index-level results for adding the block
|
||||
*/
|
||||
static Tuple<ClusterState, Collection<AddBlockResult>> finalizeBlock(final ClusterState currentState,
|
||||
final Map<Index, ClusterBlock> blockedIndices,
|
||||
final Map<Index, AddBlockResult> verifyResult,
|
||||
final APIBlock block) {
|
||||
|
||||
final Metadata.Builder metadata = Metadata.builder(currentState.metadata());
|
||||
final ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
|
||||
final RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
|
||||
|
||||
final Set<String> effectivelyBlockedIndices = new HashSet<>();
|
||||
Map<Index, AddBlockResult> blockingResults = new HashMap<>(verifyResult);
|
||||
for (Map.Entry<Index, AddBlockResult> result : verifyResult.entrySet()) {
|
||||
final Index index = result.getKey();
|
||||
final boolean acknowledged = result.getValue().hasFailures() == false;
|
||||
try {
|
||||
if (acknowledged == false) {
|
||||
logger.debug("verification of shards before blocking {} failed [{}]", index, result);
|
||||
continue;
|
||||
}
|
||||
final IndexMetadata indexMetadata = metadata.getSafe(index);
|
||||
final ClusterBlock tempBlock = blockedIndices.get(index);
|
||||
assert tempBlock != null;
|
||||
assert tempBlock.uuid() != null;
|
||||
final ClusterBlock currentBlock = currentState.blocks().getIndexBlockWithId(index.getName(), tempBlock.id());
|
||||
if (currentBlock != null && currentBlock.equals(block.block)) {
|
||||
logger.debug("verification of shards for {} succeeded, but block finalization already occurred" +
|
||||
" (possibly for another block) [{}]", index, result);
|
||||
continue;
|
||||
}
|
||||
|
||||
if (currentBlock == null || currentBlock.equals(tempBlock) == false) {
|
||||
// we should report error in this case as the index can be left as open.
|
||||
blockingResults.put(result.getKey(), new AddBlockResult(result.getKey(), new IllegalStateException(
|
||||
"verification of shards before blocking " + index + " succeeded but block has been removed in the meantime")));
|
||||
logger.debug("verification of shards before blocking {} succeeded but block has been removed in the meantime", index);
|
||||
continue;
|
||||
}
|
||||
|
||||
assert currentBlock != null && currentBlock.equals(tempBlock) && currentBlock.id() == block.block.id();
|
||||
|
||||
blocks.removeIndexBlockWithId(index.getName(), tempBlock.id());
|
||||
blocks.addIndexBlock(index.getName(), block.block);
|
||||
|
||||
logger.debug("add block {} to index {} succeeded", block.block, index);
|
||||
effectivelyBlockedIndices.add(index.getName());
|
||||
} catch (final IndexNotFoundException e) {
|
||||
logger.debug("index {} has been deleted since blocking it started, ignoring", index);
|
||||
}
|
||||
}
|
||||
logger.info("completed adding block {} to indices {}", block.name, effectivelyBlockedIndices);
|
||||
return Tuple.tuple(ClusterState.builder(currentState).blocks(blocks).metadata(metadata).routingTable(routingTable.build()).build(),
|
||||
blockingResults.values());
|
||||
}
|
||||
|
||||
/**
|
||||
* @return Generates a {@link ClusterBlock} that blocks read and write operations on soon-to-be-closed indices. The
|
||||
* cluster block is generated with the id value equals to {@link #INDEX_CLOSED_BLOCK_ID} and a unique UUID.
|
||||
@ -632,4 +979,12 @@ public class MetadataIndexStateService {
|
||||
&& VERIFIED_BEFORE_CLOSE_SETTING.exists(indexMetadata.getSettings())
|
||||
&& VERIFIED_BEFORE_CLOSE_SETTING.get(indexMetadata.getSettings());
|
||||
}
|
||||
|
||||
// Create UUID based block based on non-UUID one
|
||||
public static ClusterBlock createUUIDBasedBlock(ClusterBlock clusterBlock) {
|
||||
assert clusterBlock.uuid() == null : "no UUID expected on source block";
|
||||
return new ClusterBlock(clusterBlock.id(), UUIDs.randomBase64UUID(), "moving to block " + clusterBlock.description(),
|
||||
clusterBlock.retryable(), clusterBlock.disableStatePersistence(), clusterBlock.isAllowReleaseResources(), clusterBlock.status(),
|
||||
clusterBlock.levels());
|
||||
}
|
||||
}
|
||||
|
@ -178,16 +178,9 @@ public class MetadataUpdateSettingsService {
|
||||
}
|
||||
|
||||
ClusterBlocks.Builder blocks = ClusterBlocks.builder().blocks(currentState.blocks());
|
||||
maybeUpdateClusterBlock(actualIndices, blocks, IndexMetadata.INDEX_READ_ONLY_BLOCK,
|
||||
IndexMetadata.INDEX_READ_ONLY_SETTING, openSettings);
|
||||
maybeUpdateClusterBlock(actualIndices, blocks, IndexMetadata.INDEX_READ_ONLY_ALLOW_DELETE_BLOCK,
|
||||
IndexMetadata.INDEX_BLOCKS_READ_ONLY_ALLOW_DELETE_SETTING, openSettings);
|
||||
maybeUpdateClusterBlock(actualIndices, blocks, IndexMetadata.INDEX_METADATA_BLOCK,
|
||||
IndexMetadata.INDEX_BLOCKS_METADATA_SETTING, openSettings);
|
||||
maybeUpdateClusterBlock(actualIndices, blocks, IndexMetadata.INDEX_WRITE_BLOCK,
|
||||
IndexMetadata.INDEX_BLOCKS_WRITE_SETTING, openSettings);
|
||||
maybeUpdateClusterBlock(actualIndices, blocks, IndexMetadata.INDEX_READ_BLOCK,
|
||||
IndexMetadata.INDEX_BLOCKS_READ_SETTING, openSettings);
|
||||
for (IndexMetadata.APIBlock block : IndexMetadata.APIBlock.values()) {
|
||||
maybeUpdateClusterBlock(actualIndices, blocks, block.block, block.setting, openSettings);
|
||||
}
|
||||
|
||||
if (!openIndices.isEmpty()) {
|
||||
for (Index index : openIndices) {
|
||||
|
@ -0,0 +1,61 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.rest.action.admin.indices;
|
||||
|
||||
import org.elasticsearch.action.admin.indices.readonly.AddIndexBlockRequest;
|
||||
import org.elasticsearch.action.support.IndicesOptions;
|
||||
import org.elasticsearch.client.node.NodeClient;
|
||||
import org.elasticsearch.cluster.metadata.IndexMetadata;
|
||||
import org.elasticsearch.common.Strings;
|
||||
import org.elasticsearch.rest.BaseRestHandler;
|
||||
import org.elasticsearch.rest.RestRequest;
|
||||
import org.elasticsearch.rest.action.RestToXContentListener;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import static org.elasticsearch.rest.RestRequest.Method.PUT;
|
||||
|
||||
public class RestAddIndexBlockAction extends BaseRestHandler {
|
||||
|
||||
@Override
|
||||
public List<Route> routes() {
|
||||
return Collections.singletonList(
|
||||
new Route(PUT, "/{index}/_block/{block}"));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getName() {
|
||||
return "add_index_block_action";
|
||||
}
|
||||
|
||||
@Override
|
||||
public RestChannelConsumer prepareRequest(final RestRequest request, final NodeClient client) throws IOException {
|
||||
AddIndexBlockRequest addIndexBlockRequest = new AddIndexBlockRequest(
|
||||
IndexMetadata.APIBlock.fromName(request.param("block")),
|
||||
Strings.splitStringByCommaToArray(request.param("index")));
|
||||
addIndexBlockRequest.masterNodeTimeout(request.paramAsTime("master_timeout", addIndexBlockRequest.masterNodeTimeout()));
|
||||
addIndexBlockRequest.timeout(request.paramAsTime("timeout", addIndexBlockRequest.timeout()));
|
||||
addIndexBlockRequest.indicesOptions(IndicesOptions.fromRequest(request, addIndexBlockRequest.indicesOptions()));
|
||||
return channel -> client.admin().indices().addBlock(addIndexBlockRequest, new RestToXContentListener<>(channel));
|
||||
}
|
||||
|
||||
}
|
@ -413,7 +413,7 @@ public class MetadataIndexStateServiceTests extends ESTestCase {
|
||||
Index indexToDelete = cs.metadata().index(indicesToDelete.get(k)).getIndex();
|
||||
indicesToDeleteArray[k] = indexToDelete;
|
||||
}
|
||||
MetadataIndexStateService service = new MetadataIndexStateService(clusterService, null, null, null, null, null, null);
|
||||
MetadataIndexStateService service = new MetadataIndexStateService(clusterService, null, null, null, null, null, null, null);
|
||||
CloseIndexClusterStateUpdateRequest request = new CloseIndexClusterStateUpdateRequest(0L).indices(indicesToDeleteArray);
|
||||
Exception e = expectThrows(IllegalArgumentException.class, () -> service.closeIndices(request, null));
|
||||
assertThat(e.getMessage(), CoreMatchers.containsString("cannot close the following data stream write indices [" +
|
||||
|
@ -36,6 +36,7 @@ import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.delete.TransportDeleteIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.open.OpenIndexRequest;
|
||||
import org.elasticsearch.action.admin.indices.open.TransportOpenIndexAction;
|
||||
import org.elasticsearch.action.admin.indices.readonly.TransportVerifyShardIndexBlockAction;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.TransportUpdateSettingsAction;
|
||||
import org.elasticsearch.action.admin.indices.settings.put.UpdateSettingsRequest;
|
||||
import org.elasticsearch.action.support.ActionFilters;
|
||||
@ -198,9 +199,12 @@ public class ClusterStateChanges {
|
||||
|
||||
TransportVerifyShardBeforeCloseAction transportVerifyShardBeforeCloseAction = new TransportVerifyShardBeforeCloseAction(SETTINGS,
|
||||
transportService, clusterService, indicesService, threadPool, null, actionFilters);
|
||||
TransportVerifyShardIndexBlockAction transportVerifyShardIndexBlockAction = new TransportVerifyShardIndexBlockAction(SETTINGS,
|
||||
transportService, clusterService, indicesService, threadPool, null, actionFilters);
|
||||
ShardLimitValidator shardLimitValidator = new ShardLimitValidator(SETTINGS, clusterService);
|
||||
MetadataIndexStateService indexStateService = new MetadataIndexStateService(clusterService, allocationService,
|
||||
metadataIndexUpgradeService, indicesService, shardLimitValidator, threadPool, transportVerifyShardBeforeCloseAction);
|
||||
metadataIndexUpgradeService, indicesService, shardLimitValidator, threadPool, transportVerifyShardBeforeCloseAction,
|
||||
transportVerifyShardIndexBlockAction);
|
||||
MetadataDeleteIndexService deleteIndexService = new MetadataDeleteIndexService(SETTINGS, clusterService, allocationService);
|
||||
MetadataUpdateSettingsService metadataUpdateSettingsService = new MetadataUpdateSettingsService(clusterService,
|
||||
allocationService, IndexScopedSettings.DEFAULT_SCOPED_SETTINGS, indicesService, shardLimitValidator, threadPool);
|
||||
|
@ -1572,8 +1572,12 @@ public abstract class ESIntegTestCase extends ESTestCase {
|
||||
|
||||
/** Enables an index block for the specified index */
|
||||
public static void enableIndexBlock(String index, String block) {
|
||||
Settings settings = Settings.builder().put(block, true).build();
|
||||
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
|
||||
if (randomBoolean()) {
|
||||
Settings settings = Settings.builder().put(block, true).build();
|
||||
client().admin().indices().prepareUpdateSettings(index).setSettings(settings).get();
|
||||
} else {
|
||||
client().admin().indices().prepareAddBlock(IndexMetadata.APIBlock.fromSetting(block), index).get();
|
||||
}
|
||||
}
|
||||
|
||||
/** Sets or unsets the cluster read_only mode **/
|
||||
|
Loading…
x
Reference in New Issue
Block a user