mirror of
https://github.com/honeymoose/OpenSearch.git
synced 2025-02-24 13:55:57 +00:00
Skip Rollover step if next index already exists (#35168)
If the Rollover step would fail due to the next index in sequence already existing, just skip to the next step instead of going to the Error step. This prevents spurious `ResourceAlreadyExistsException`s created by simultaneous RolloverStep executions from causing ILM to error out unnecessarily.
This commit is contained in:
parent
4f35eea8fe
commit
0fbb8a16bc
@ -5,6 +5,10 @@
|
||||
*/
|
||||
package org.elasticsearch.xpack.core.indexlifecycle;
|
||||
|
||||
import org.apache.logging.log4j.LogManager;
|
||||
import org.apache.logging.log4j.Logger;
|
||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||
import org.elasticsearch.ResourceAlreadyExistsException;
|
||||
import org.elasticsearch.action.ActionListener;
|
||||
import org.elasticsearch.action.admin.indices.rollover.RolloverRequest;
|
||||
import org.elasticsearch.client.Client;
|
||||
@ -22,6 +26,8 @@ import java.util.Objects;
|
||||
public class RolloverStep extends AsyncWaitStep {
|
||||
public static final String NAME = "attempt_rollover";
|
||||
|
||||
private static final Logger logger = LogManager.getLogger(RolloverStep.class);
|
||||
|
||||
private ByteSizeValue maxSize;
|
||||
private TimeValue maxAge;
|
||||
private Long maxDocs;
|
||||
@ -63,7 +69,21 @@ public class RolloverStep extends AsyncWaitStep {
|
||||
rolloverRequest.addMaxIndexDocsCondition(maxDocs);
|
||||
}
|
||||
getClient().admin().indices().rolloverIndex(rolloverRequest,
|
||||
ActionListener.wrap(response -> listener.onResponse(response.isRolledOver(), new EmptyInfo()), listener::onFailure));
|
||||
ActionListener.wrap(response -> listener.onResponse(response.isRolledOver(), new EmptyInfo()), exception -> {
|
||||
if (exception instanceof ResourceAlreadyExistsException) {
|
||||
// This can happen sometimes when this step is executed multiple times
|
||||
if (logger.isTraceEnabled()) {
|
||||
logger.debug(() -> new ParameterizedMessage("{} index cannot roll over because the next index already exists, " +
|
||||
"skipping to next step", indexMetaData.getIndex()), exception);
|
||||
} else {
|
||||
logger.debug("{} index cannot roll over because the next index already exists, skipping to next step",
|
||||
indexMetaData.getIndex());
|
||||
}
|
||||
listener.onResponse(true, new EmptyInfo());
|
||||
} else {
|
||||
listener.onFailure(exception);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
ByteSizeValue getMaxSize() {
|
||||
|
@ -32,7 +32,8 @@ public class UpdateRolloverLifecycleDateStep extends ClusterStateActionStep {
|
||||
}
|
||||
RolloverInfo rolloverInfo = indexMetaData.getRolloverInfos().get(rolloverAlias);
|
||||
if (rolloverInfo == null) {
|
||||
throw new IllegalStateException("index [" + indexMetaData.getIndex().getName() + "] has not rolled over yet");
|
||||
throw new IllegalStateException("no rollover info found for [" + indexMetaData.getIndex().getName() + "], either the index " +
|
||||
"has not yet rolled over or a subsequent index was created outside of Index Lifecycle Management");
|
||||
}
|
||||
|
||||
LifecycleExecutionState.Builder newLifecycleState = LifecycleExecutionState
|
||||
|
@ -87,7 +87,8 @@ public class UpdateRolloverLifecycleDateStepTests extends AbstractStepTestCase<U
|
||||
IllegalStateException exceptionThrown = expectThrows(IllegalStateException.class,
|
||||
() -> step.performAction(indexMetaData.getIndex(), clusterState));
|
||||
assertThat(exceptionThrown.getMessage(),
|
||||
equalTo("index [" + indexMetaData.getIndex().getName() + "] has not rolled over yet"));
|
||||
equalTo("no rollover info found for [" + indexMetaData.getIndex().getName() + "], either the index " +
|
||||
"has not yet rolled over or a subsequent index was created outside of Index Lifecycle Management"));
|
||||
}
|
||||
|
||||
public void testPerformActionWithNoRolloverAliasSetting() {
|
||||
|
@ -22,6 +22,7 @@ import org.elasticsearch.common.xcontent.XContentType;
|
||||
import org.elasticsearch.test.rest.ESRestTestCase;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.AllocateAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.DeleteAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ErrorStep;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.ForceMergeAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecycleAction;
|
||||
import org.elasticsearch.xpack.core.indexlifecycle.LifecyclePolicy;
|
||||
@ -192,6 +193,39 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
||||
assertBusy(() -> assertTrue(indexExists(originalIndex)));
|
||||
}
|
||||
|
||||
public void testRolloverAlreadyExists() throws Exception {
|
||||
String originalIndex = index + "-000001";
|
||||
String secondIndex = index + "-000002";
|
||||
createIndexWithSettings(originalIndex, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0)
|
||||
.put(RolloverAction.LIFECYCLE_ROLLOVER_ALIAS, "alias"));
|
||||
|
||||
// create policy
|
||||
createNewSingletonPolicy("hot", new RolloverAction(null, null, 1L));
|
||||
// update policy on index
|
||||
updatePolicy(originalIndex, policy);
|
||||
|
||||
// Manually create the new index
|
||||
Request request = new Request("PUT", "/" + secondIndex);
|
||||
request.setJsonEntity("{\n \"settings\": " + Strings.toString(Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0).build()) + "}");
|
||||
client().performRequest(request);
|
||||
// wait for the shards to initialize
|
||||
ensureGreen(secondIndex);
|
||||
|
||||
// index another doc to trigger the policy
|
||||
index(client(), originalIndex, "_id", "foo", "bar");
|
||||
|
||||
assertBusy(() -> {
|
||||
logger.info(originalIndex + ": " + getStepKeyForIndex(originalIndex));
|
||||
logger.info(secondIndex + ": " + getStepKeyForIndex(secondIndex));
|
||||
assertThat(getStepKeyForIndex(originalIndex), equalTo(new StepKey("hot", RolloverAction.NAME, ErrorStep.NAME)));
|
||||
assertThat(getFailedStepForIndex(originalIndex), equalTo("update-rollover-lifecycle-date"));
|
||||
assertThat(getReasonForIndex(originalIndex), equalTo("no rollover info found for [" + originalIndex + "], either the index " +
|
||||
"has not yet rolled over or a subsequent index was created outside of Index Lifecycle Management"));
|
||||
});
|
||||
}
|
||||
|
||||
public void testAllocateOnlyAllocation() throws Exception {
|
||||
createIndexWithSettings(index, Settings.builder().put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 2)
|
||||
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 0));
|
||||
@ -436,6 +470,33 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
||||
}
|
||||
|
||||
private StepKey getStepKeyForIndex(String indexName) throws IOException {
|
||||
Map<String, Object> indexResponse = explainIndex(indexName);
|
||||
if (indexResponse == null) {
|
||||
return new StepKey(null, null, null);
|
||||
}
|
||||
|
||||
String phase = (String) indexResponse.get("phase");
|
||||
String action = (String) indexResponse.get("action");
|
||||
String step = (String) indexResponse.get("step");
|
||||
return new StepKey(phase, action, step);
|
||||
}
|
||||
|
||||
private String getFailedStepForIndex(String indexName) throws IOException {
|
||||
Map<String, Object> indexResponse = explainIndex(indexName);
|
||||
if (indexResponse == null) return null;
|
||||
|
||||
return (String) indexResponse.get("failed_step");
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private String getReasonForIndex(String indexName) throws IOException {
|
||||
Map<String, Object> indexResponse = explainIndex(indexName);
|
||||
if (indexResponse == null) return null;
|
||||
|
||||
return ((Map<String, String>) indexResponse.get("step_info")).get("reason");
|
||||
}
|
||||
|
||||
private Map<String, Object> explainIndex(String indexName) throws IOException {
|
||||
Request explainRequest = new Request("GET", indexName + "/_ilm/explain");
|
||||
Response response = client().performRequest(explainRequest);
|
||||
Map<String, Object> responseMap;
|
||||
@ -443,15 +504,9 @@ public class TimeSeriesLifecycleActionsIT extends ESRestTestCase {
|
||||
responseMap = XContentHelper.convertToMap(XContentType.JSON.xContent(), is, true);
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked") Map<String, String> indexResponse = ((Map<String, Map<String, String>>) responseMap.get("indices"))
|
||||
@SuppressWarnings("unchecked") Map<String, Object> indexResponse = ((Map<String, Map<String, Object>>) responseMap.get("indices"))
|
||||
.get(indexName);
|
||||
if (indexResponse == null) {
|
||||
return new StepKey(null, null, null);
|
||||
}
|
||||
String phase = indexResponse.get("phase");
|
||||
String action = indexResponse.get("action");
|
||||
String step = indexResponse.get("step");
|
||||
return new StepKey(phase, action, step);
|
||||
return indexResponse;
|
||||
}
|
||||
|
||||
private void indexDocument() throws IOException {
|
||||
|
Loading…
x
Reference in New Issue
Block a user