[7.x] Use rollover for SLM's history indices (#45686)

Following our own guidelines, SLM should use rollover instead of purely
time-based indices to keep shard counts low. This commit implements lazy
index creation for SLM's history indices, indexing via an alias, and
rollover in the built-in ILM policy.
This commit is contained in:
Gordon Brown 2019-08-21 13:42:11 -06:00 committed by GitHub
parent c3296d3251
commit 47b1e2b3d0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 321 additions and 63 deletions

View File

@ -9,19 +9,21 @@ package org.elasticsearch.xpack.core.slm.history;
import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client; import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.time.DateFormatter;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory; import org.elasticsearch.common.xcontent.XContentFactory;
import java.io.IOException; import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZonedDateTime;
import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING; import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING;
import static org.elasticsearch.xpack.core.slm.history.SnapshotLifecycleTemplateRegistry.INDEX_TEMPLATE_VERSION; import static org.elasticsearch.xpack.core.slm.history.SnapshotLifecycleTemplateRegistry.INDEX_TEMPLATE_VERSION;
@ -32,17 +34,17 @@ import static org.elasticsearch.xpack.core.slm.history.SnapshotLifecycleTemplate
*/ */
public class SnapshotHistoryStore { public class SnapshotHistoryStore {
private static final Logger logger = LogManager.getLogger(SnapshotHistoryStore.class); private static final Logger logger = LogManager.getLogger(SnapshotHistoryStore.class);
private static final DateFormatter indexTimeFormat = DateFormatter.forPattern("yyyy.MM");
public static final String SLM_HISTORY_INDEX_PREFIX = ".slm-history-" + INDEX_TEMPLATE_VERSION + "-"; public static final String SLM_HISTORY_INDEX_PREFIX = ".slm-history-" + INDEX_TEMPLATE_VERSION + "-";
public static final String SLM_HISTORY_ALIAS = ".slm-history-" + INDEX_TEMPLATE_VERSION;
private final Client client; private final Client client;
private final ZoneId timeZone; private final ClusterService clusterService;
private final boolean slmHistoryEnabled; private final boolean slmHistoryEnabled;
public SnapshotHistoryStore(Settings nodeSettings, Client client, ZoneId timeZone) { public SnapshotHistoryStore(Settings nodeSettings, Client client, ClusterService clusterService) {
this.client = client; this.client = client;
this.timeZone = timeZone; this.clusterService = clusterService;
slmHistoryEnabled = SLM_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings); slmHistoryEnabled = SLM_HISTORY_INDEX_ENABLED_SETTING.get(nodeSettings);
} }
@ -57,28 +59,84 @@ public class SnapshotHistoryStore {
SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), item); SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), item);
return; return;
} }
final ZonedDateTime dateTime = ZonedDateTime.ofInstant(Instant.ofEpochMilli(item.getTimestamp()), timeZone); logger.trace("about to index snapshot history item in index [{}]: [{}]", SLM_HISTORY_ALIAS, item);
final String index = getHistoryIndexNameForTime(dateTime); ensureHistoryIndex(client, clusterService.state(), ActionListener.wrap(createdIndex -> {
logger.trace("about to index snapshot history item in index [{}]: [{}]", index, item); try (XContentBuilder builder = XContentFactory.jsonBuilder()) {
try (XContentBuilder builder = XContentFactory.jsonBuilder()) { item.toXContent(builder, ToXContent.EMPTY_PARAMS);
item.toXContent(builder, ToXContent.EMPTY_PARAMS); IndexRequest request = new IndexRequest(SLM_HISTORY_ALIAS)
IndexRequest request = new IndexRequest(index) .source(builder);
.source(builder); client.index(request, ActionListener.wrap(indexResponse -> {
client.index(request, ActionListener.wrap(indexResponse -> { logger.debug("successfully indexed snapshot history item with id [{}] in index [{}]: [{}]",
logger.debug("successfully indexed snapshot history item with id [{}] in index [{}]: [{}]", indexResponse.getId(), SLM_HISTORY_ALIAS, item);
indexResponse.getId(), index, item); }, exception -> {
}, exception -> { logger.error(new ParameterizedMessage("failed to index snapshot history item in index [{}]: [{}]",
SLM_HISTORY_ALIAS, item), exception);
}));
} catch (IOException exception) {
logger.error(new ParameterizedMessage("failed to index snapshot history item in index [{}]: [{}]", logger.error(new ParameterizedMessage("failed to index snapshot history item in index [{}]: [{}]",
index, item), exception); SLM_HISTORY_ALIAS, item), exception);
})); }
} catch (IOException exception) { }, ex -> logger.error(new ParameterizedMessage("failed to ensure SLM history index exists, not indexing history item [{}]",
logger.error(new ParameterizedMessage("failed to index snapshot history item in index [{}]: [{}]", item), ex)));
index, item), exception); }
/**
* Checks if the SLM history index exists, and if not, creates it.
*
* @param client The client to use to create the index if needed
* @param state The current cluster state, to determine if the alias exists
* @param andThen Called after the index has been created. `onResponse` called with `true` if the index was created,
* `false` if it already existed.
*/
static void ensureHistoryIndex(Client client, ClusterState state, ActionListener<Boolean> andThen) {
final String initialHistoryIndexName = SLM_HISTORY_INDEX_PREFIX + "000001";
final AliasOrIndex slmHistory = state.metaData().getAliasAndIndexLookup().get(SLM_HISTORY_ALIAS);
final AliasOrIndex initialHistoryIndex = state.metaData().getAliasAndIndexLookup().get(initialHistoryIndexName);
if (slmHistory == null && initialHistoryIndex == null) {
// No alias or index exists with the expected names, so create the index with appropriate alias
client.admin().indices().prepareCreate(initialHistoryIndexName)
.setWaitForActiveShards(1)
.addAlias(new Alias(SLM_HISTORY_ALIAS)
.writeIndex(true))
.execute(new ActionListener<CreateIndexResponse>() {
@Override
public void onResponse(CreateIndexResponse response) {
andThen.onResponse(true);
}
@Override
public void onFailure(Exception e) {
if (e instanceof ResourceAlreadyExistsException) {
// The index didn't exist before we made the call, there was probably a race - just ignore this
logger.debug("index [{}] was created after checking for its existence, likely due to a concurrent call",
initialHistoryIndexName);
andThen.onResponse(false);
} else {
andThen.onFailure(e);
}
}
});
} else if (slmHistory == null) {
// alias does not exist but initial index does, something is broken
andThen.onFailure(new IllegalStateException("SLM history index [" + initialHistoryIndexName +
"] already exists but does not have alias [" + SLM_HISTORY_ALIAS + "]"));
} else if (slmHistory.isAlias() && slmHistory instanceof AliasOrIndex.Alias) {
if (((AliasOrIndex.Alias) slmHistory).getWriteIndex() != null) {
// The alias exists and has a write index, so we're good
andThen.onResponse(false);
} else {
// The alias does not have a write index, so we can't index into it
andThen.onFailure(new IllegalStateException("SLM history alias [" + SLM_HISTORY_ALIAS + "does not have a write index"));
}
} else if (slmHistory.isAlias() == false) {
// This is not an alias, error out
andThen.onFailure(new IllegalStateException("SLM history alias [" + SLM_HISTORY_ALIAS +
"] already exists as concrete index"));
} else {
logger.error("unexpected IndexOrAlias for [{}]: [{}]", SLM_HISTORY_ALIAS, slmHistory);
// (slmHistory.isAlias() == true) but (slmHistory instanceof Alias == false)?
assert false : SLM_HISTORY_ALIAS + " cannot be both an alias and not an alias simultaneously";
} }
} }
static String getHistoryIndexNameForTime(ZonedDateTime time) {
return SLM_HISTORY_INDEX_PREFIX + indexTimeFormat.format(time);
}
} }

View File

@ -1,7 +1,15 @@
{ {
"phases": { "phases": {
"hot": {
"actions": {
"rollover": {
"max_size": "50GB",
"max_age": "30d"
}
}
},
"delete": { "delete": {
"min_age": "60d", "min_age": "90d",
"actions": { "actions": {
"delete": {} "delete": {}
} }

View File

@ -6,28 +6,45 @@
package org.elasticsearch.xpack.core.slm.history; package org.elasticsearch.xpack.core.slm.history;
import org.elasticsearch.ResourceAlreadyExistsException;
import org.elasticsearch.Version;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.LatchedActionListener;
import org.elasticsearch.action.admin.indices.create.CreateIndexAction;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.create.CreateIndexResponse;
import org.elasticsearch.action.index.IndexAction; import org.elasticsearch.action.index.IndexAction;
import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasMetaData;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.test.ClusterServiceUtils;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy; import org.elasticsearch.xpack.core.slm.SnapshotLifecyclePolicy;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import java.time.Instant;
import java.time.ZoneOffset;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.awaitLatch;
import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING; import static org.elasticsearch.xpack.core.ilm.LifecycleSettings.SLM_HISTORY_INDEX_ENABLED_SETTING;
import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore.getHistoryIndexNameForTime; import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore.SLM_HISTORY_ALIAS;
import static org.elasticsearch.xpack.core.slm.history.SnapshotLifecycleTemplateRegistry.INDEX_TEMPLATE_VERSION; import static org.elasticsearch.xpack.core.slm.history.SnapshotHistoryStore.SLM_HISTORY_INDEX_PREFIX;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.core.IsEqual.equalTo; import static org.hamcrest.core.IsEqual.equalTo;
@ -41,7 +58,8 @@ public class SnapshotHistoryStoreTests extends ESTestCase {
public void setup() { public void setup() {
threadPool = new TestThreadPool(this.getClass().getName()); threadPool = new TestThreadPool(this.getClass().getName());
client = new SnapshotLifecycleTemplateRegistryTests.VerifyingClient(threadPool); client = new SnapshotLifecycleTemplateRegistryTests.VerifyingClient(threadPool);
historyStore = new SnapshotHistoryStore(Settings.EMPTY, client, ZoneOffset.UTC); ClusterService clusterService = ClusterServiceUtils.createClusterService(threadPool);
historyStore = new SnapshotHistoryStore(Settings.EMPTY, client, clusterService);
} }
@After @After
@ -53,7 +71,7 @@ public class SnapshotHistoryStoreTests extends ESTestCase {
public void testNoActionIfDisabled() { public void testNoActionIfDisabled() {
Settings settings = Settings.builder().put(SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), false).build(); Settings settings = Settings.builder().put(SLM_HISTORY_INDEX_ENABLED_SETTING.getKey(), false).build();
SnapshotHistoryStore disabledHistoryStore = new SnapshotHistoryStore(settings, client, ZoneOffset.UTC); SnapshotHistoryStore disabledHistoryStore = new SnapshotHistoryStore(settings, client, null);
String policyId = randomAlphaOfLength(5); String policyId = randomAlphaOfLength(5);
SnapshotLifecyclePolicy policy = randomSnapshotLifecyclePolicy(policyId); SnapshotLifecyclePolicy policy = randomSnapshotLifecyclePolicy(policyId);
final long timestamp = randomNonNegativeLong(); final long timestamp = randomNonNegativeLong();
@ -61,7 +79,7 @@ public class SnapshotHistoryStoreTests extends ESTestCase {
String snapshotId = policy.generateSnapshotName(context); String snapshotId = policy.generateSnapshotName(context);
SnapshotHistoryItem record = SnapshotHistoryItem.successRecord(timestamp, policy, snapshotId); SnapshotHistoryItem record = SnapshotHistoryItem.successRecord(timestamp, policy, snapshotId);
client.setVerifier((a,r,l) -> { client.setVerifier((a, r, l) -> {
fail("the history store is disabled, no action should have been taken"); fail("the history store is disabled, no action should have been taken");
return null; return null;
}); });
@ -80,11 +98,14 @@ public class SnapshotHistoryStoreTests extends ESTestCase {
AtomicInteger calledTimes = new AtomicInteger(0); AtomicInteger calledTimes = new AtomicInteger(0);
client.setVerifier((action, request, listener) -> { client.setVerifier((action, request, listener) -> {
if (action instanceof CreateIndexAction && request instanceof CreateIndexRequest) {
return new CreateIndexResponse(true, true, ((CreateIndexRequest) request).index());
}
calledTimes.incrementAndGet(); calledTimes.incrementAndGet();
assertThat(action, instanceOf(IndexAction.class)); assertThat(action, instanceOf(IndexAction.class));
assertThat(request, instanceOf(IndexRequest.class)); assertThat(request, instanceOf(IndexRequest.class));
IndexRequest indexRequest = (IndexRequest) request; IndexRequest indexRequest = (IndexRequest) request;
assertEquals(getHistoryIndexNameForTime(Instant.ofEpochMilli(timestamp).atZone(ZoneOffset.UTC)), indexRequest.index()); assertEquals(SLM_HISTORY_ALIAS, indexRequest.index());
final String indexedDocument = indexRequest.source().utf8ToString(); final String indexedDocument = indexRequest.source().utf8ToString();
assertThat(indexedDocument, containsString(policy.getId())); assertThat(indexedDocument, containsString(policy.getId()));
assertThat(indexedDocument, containsString(policy.getRepository())); assertThat(indexedDocument, containsString(policy.getRepository()));
@ -98,9 +119,9 @@ public class SnapshotHistoryStoreTests extends ESTestCase {
new ShardId(randomAlphaOfLength(5), randomAlphaOfLength(5), randomInt(100)), new ShardId(randomAlphaOfLength(5), randomAlphaOfLength(5), randomInt(100)),
randomAlphaOfLength(5), randomAlphaOfLength(5),
randomAlphaOfLength(5), randomAlphaOfLength(5),
randomLongBetween(1,1000), randomLongBetween(1, 1000),
randomLongBetween(1,1000), randomLongBetween(1, 1000),
randomLongBetween(1,1000), randomLongBetween(1, 1000),
randomBoolean()); randomBoolean());
}); });
@ -115,11 +136,14 @@ public class SnapshotHistoryStoreTests extends ESTestCase {
AtomicInteger calledTimes = new AtomicInteger(0); AtomicInteger calledTimes = new AtomicInteger(0);
client.setVerifier((action, request, listener) -> { client.setVerifier((action, request, listener) -> {
if (action instanceof CreateIndexAction && request instanceof CreateIndexRequest) {
return new CreateIndexResponse(true, true, ((CreateIndexRequest) request).index());
}
calledTimes.incrementAndGet(); calledTimes.incrementAndGet();
assertThat(action, instanceOf(IndexAction.class)); assertThat(action, instanceOf(IndexAction.class));
assertThat(request, instanceOf(IndexRequest.class)); assertThat(request, instanceOf(IndexRequest.class));
IndexRequest indexRequest = (IndexRequest) request; IndexRequest indexRequest = (IndexRequest) request;
assertEquals(getHistoryIndexNameForTime(Instant.ofEpochMilli(timestamp).atZone(ZoneOffset.UTC)), indexRequest.index()); assertEquals(SLM_HISTORY_ALIAS, indexRequest.index());
final String indexedDocument = indexRequest.source().utf8ToString(); final String indexedDocument = indexRequest.source().utf8ToString();
assertThat(indexedDocument, containsString(policy.getId())); assertThat(indexedDocument, containsString(policy.getId()));
assertThat(indexedDocument, containsString(policy.getRepository())); assertThat(indexedDocument, containsString(policy.getRepository()));
@ -135,9 +159,9 @@ public class SnapshotHistoryStoreTests extends ESTestCase {
new ShardId(randomAlphaOfLength(5), randomAlphaOfLength(5), randomInt(100)), new ShardId(randomAlphaOfLength(5), randomAlphaOfLength(5), randomInt(100)),
randomAlphaOfLength(5), randomAlphaOfLength(5),
randomAlphaOfLength(5), randomAlphaOfLength(5),
randomLongBetween(1,1000), randomLongBetween(1, 1000),
randomLongBetween(1,1000), randomLongBetween(1, 1000),
randomLongBetween(1,1000), randomLongBetween(1, 1000),
randomBoolean()); randomBoolean());
}); });
@ -146,13 +170,188 @@ public class SnapshotHistoryStoreTests extends ESTestCase {
} }
} }
public void testHistoryIndexNeedsCreation() throws InterruptedException {
ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5)))
.metaData(MetaData.builder())
.build();
client.setVerifier((a, r, l) -> {
assertThat(a, instanceOf(CreateIndexAction.class));
assertThat(r, instanceOf(CreateIndexRequest.class));
CreateIndexRequest request = (CreateIndexRequest) r;
assertThat(request.aliases(), hasSize(1));
request.aliases().forEach(alias -> {
assertThat(alias.name(), equalTo(SLM_HISTORY_ALIAS));
assertTrue(alias.writeIndex());
});
return new CreateIndexResponse(true, true, request.index());
});
CountDownLatch latch = new CountDownLatch(1);
SnapshotHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap(
Assert::assertTrue,
ex -> {
logger.error(ex);
fail("should have called onResponse, not onFailure");
}), latch));
awaitLatch(latch, 10, TimeUnit.SECONDS);
}
public void testHistoryIndexProperlyExistsAlready() throws InterruptedException {
ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5)))
.metaData(MetaData.builder()
.put(IndexMetaData.builder(SLM_HISTORY_INDEX_PREFIX + "000001")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(randomIntBetween(1,10))
.numberOfReplicas(randomIntBetween(1,10))
.putAlias(AliasMetaData.builder(SLM_HISTORY_ALIAS)
.writeIndex(true)
.build())))
.build();
client.setVerifier((a, r, l) -> {
fail("no client calls should have been made");
return null;
});
CountDownLatch latch = new CountDownLatch(1);
SnapshotHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap(
Assert::assertFalse,
ex -> {
logger.error(ex);
fail("should have called onResponse, not onFailure");
}), latch));
awaitLatch(latch, 10, TimeUnit.SECONDS);
}
public void testHistoryIndexHasNoWriteIndex() throws InterruptedException {
ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5)))
.metaData(MetaData.builder()
.put(IndexMetaData.builder(SLM_HISTORY_INDEX_PREFIX + "000001")
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(randomIntBetween(1,10))
.numberOfReplicas(randomIntBetween(1,10))
.putAlias(AliasMetaData.builder(SLM_HISTORY_ALIAS)
.build()))
.put(IndexMetaData.builder(randomAlphaOfLength(5))
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(randomIntBetween(1,10))
.numberOfReplicas(randomIntBetween(1,10))
.putAlias(AliasMetaData.builder(SLM_HISTORY_ALIAS)
.build())))
.build();
client.setVerifier((a, r, l) -> {
fail("no client calls should have been made");
return null;
});
CountDownLatch latch = new CountDownLatch(1);
SnapshotHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap(
indexCreated -> fail("should have called onFailure, not onResponse"),
ex -> {
assertThat(ex, instanceOf(IllegalStateException.class));
assertThat(ex.getMessage(), containsString("SLM history alias [" + SLM_HISTORY_ALIAS +
"does not have a write index"));
}), latch));
awaitLatch(latch, 10, TimeUnit.SECONDS);
}
public void testHistoryIndexNotAlias() throws InterruptedException {
ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5)))
.metaData(MetaData.builder()
.put(IndexMetaData.builder(SLM_HISTORY_ALIAS)
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(randomIntBetween(1,10))
.numberOfReplicas(randomIntBetween(1,10))))
.build();
client.setVerifier((a, r, l) -> {
fail("no client calls should have been made");
return null;
});
CountDownLatch latch = new CountDownLatch(1);
SnapshotHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap(
indexCreated -> fail("should have called onFailure, not onResponse"),
ex -> {
assertThat(ex, instanceOf(IllegalStateException.class));
assertThat(ex.getMessage(), containsString("SLM history alias [" + SLM_HISTORY_ALIAS +
"] already exists as concrete index"));
}), latch));
awaitLatch(latch, 10, TimeUnit.SECONDS);
}
public void testHistoryIndexCreatedConcurrently() throws InterruptedException {
ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5)))
.metaData(MetaData.builder())
.build();
client.setVerifier((a, r, l) -> {
assertThat(a, instanceOf(CreateIndexAction.class));
assertThat(r, instanceOf(CreateIndexRequest.class));
CreateIndexRequest request = (CreateIndexRequest) r;
assertThat(request.aliases(), hasSize(1));
request.aliases().forEach(alias -> {
assertThat(alias.name(), equalTo(SLM_HISTORY_ALIAS));
assertTrue(alias.writeIndex());
});
throw new ResourceAlreadyExistsException("that index already exists");
});
CountDownLatch latch = new CountDownLatch(1);
SnapshotHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap(
Assert::assertFalse,
ex -> {
logger.error(ex);
fail("should have called onResponse, not onFailure");
}), latch));
awaitLatch(latch, 10, TimeUnit.SECONDS);
}
public void testHistoryAliasDoesntExistButIndexDoes() throws InterruptedException {
final String initialIndex = SLM_HISTORY_INDEX_PREFIX + "000001";
ClusterState state = ClusterState.builder(new ClusterName(randomAlphaOfLength(5)))
.metaData(MetaData.builder()
.put(IndexMetaData.builder(initialIndex)
.settings(Settings.builder().put(IndexMetaData.SETTING_VERSION_CREATED, Version.CURRENT))
.numberOfShards(randomIntBetween(1,10))
.numberOfReplicas(randomIntBetween(1,10))))
.build();
client.setVerifier((a, r, l) -> {
fail("no client calls should have been made");
return null;
});
CountDownLatch latch = new CountDownLatch(1);
SnapshotHistoryStore.ensureHistoryIndex(client, state, new LatchedActionListener<>(ActionListener.wrap(
response -> {
logger.error(response);
fail("should have called onFailure, not onResponse");
},
ex -> {
assertThat(ex, instanceOf(IllegalStateException.class));
assertThat(ex.getMessage(), containsString("SLM history index [" + initialIndex +
"] already exists but does not have alias [" + SLM_HISTORY_ALIAS + "]"));
}), latch));
awaitLatch(latch, 10, TimeUnit.SECONDS);
}
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
private void assertContainsMap(String indexedDocument, Map<String, Object> map) { private void assertContainsMap(String indexedDocument, Map<String, Object> map) {
map.forEach((k, v) -> { map.forEach((k, v) -> {
assertThat(indexedDocument, containsString(k)); assertThat(indexedDocument, containsString(k));
if (v instanceof Map) { if (v instanceof Map) {
assertContainsMap(indexedDocument, (Map<String, Object>) v); assertContainsMap(indexedDocument, (Map<String, Object>) v);
} if (v instanceof Iterable) { }
if (v instanceof Iterable) {
((Iterable) v).forEach(elem -> { ((Iterable) v).forEach(elem -> {
assertThat(indexedDocument, containsString(elem.toString())); assertThat(indexedDocument, containsString(elem.toString()));
}); });
@ -162,19 +361,6 @@ public class SnapshotHistoryStoreTests extends ESTestCase {
}); });
} }
public void testIndexNameGeneration() {
String indexTemplateVersion = INDEX_TEMPLATE_VERSION;
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli((long) 0).atZone(ZoneOffset.UTC)),
equalTo(".slm-history-"+ indexTemplateVersion +"-1970.01"));
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(100000000000L).atZone(ZoneOffset.UTC)),
equalTo(".slm-history-" + indexTemplateVersion + "-1973.03"));
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(1416582852000L).atZone(ZoneOffset.UTC)),
equalTo(".slm-history-" + indexTemplateVersion + "-2014.11"));
assertThat(getHistoryIndexNameForTime(Instant.ofEpochMilli(2833165811000L).atZone(ZoneOffset.UTC)),
equalTo(".slm-history-" + indexTemplateVersion + "-2059.10"));
}
public static SnapshotLifecyclePolicy randomSnapshotLifecyclePolicy(String id) { public static SnapshotLifecyclePolicy randomSnapshotLifecyclePolicy(String id) {
Map<String, Object> config = null; Map<String, Object> config = null;
if (randomBoolean()) { if (randomBoolean()) {

View File

@ -44,6 +44,7 @@ import org.elasticsearch.xpack.core.ilm.LifecyclePolicy;
import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata; import org.elasticsearch.xpack.core.ilm.LifecyclePolicyMetadata;
import org.elasticsearch.xpack.core.ilm.LifecycleType; import org.elasticsearch.xpack.core.ilm.LifecycleType;
import org.elasticsearch.xpack.core.ilm.OperationMode; import org.elasticsearch.xpack.core.ilm.OperationMode;
import org.elasticsearch.xpack.core.ilm.RolloverAction;
import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType; import org.elasticsearch.xpack.core.ilm.TimeseriesLifecycleType;
import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction; import org.elasticsearch.xpack.core.ilm.action.PutLifecycleAction;
import org.junit.After; import org.junit.After;
@ -85,6 +86,7 @@ public class SnapshotLifecycleTemplateRegistryTests extends ESTestCase {
entries.addAll(Arrays.asList( entries.addAll(Arrays.asList(
new NamedXContentRegistry.Entry(LifecycleType.class, new ParseField(TimeseriesLifecycleType.TYPE), new NamedXContentRegistry.Entry(LifecycleType.class, new ParseField(TimeseriesLifecycleType.TYPE),
(p) -> TimeseriesLifecycleType.INSTANCE), (p) -> TimeseriesLifecycleType.INSTANCE),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(RolloverAction.NAME), RolloverAction::parse),
new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse))); new NamedXContentRegistry.Entry(LifecycleAction.class, new ParseField(DeleteAction.NAME), DeleteAction::parse)));
xContentRegistry = new NamedXContentRegistry(entries); xContentRegistry = new NamedXContentRegistry(entries);
registry = new SnapshotLifecycleTemplateRegistry(Settings.EMPTY, clusterService, threadPool, client, xContentRegistry); registry = new SnapshotLifecycleTemplateRegistry(Settings.EMPTY, clusterService, threadPool, client, xContentRegistry);
@ -276,7 +278,11 @@ public class SnapshotLifecycleTemplateRegistryTests extends ESTestCase {
protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(ActionType<Response> action, protected <Request extends ActionRequest, Response extends ActionResponse> void doExecute(ActionType<Response> action,
Request request, Request request,
ActionListener<Response> listener) { ActionListener<Response> listener) {
listener.onResponse((Response) verifier.apply(action, request, listener)); try {
listener.onResponse((Response) verifier.apply(action, request, listener));
} catch (Exception e) {
listener.onFailure(e);
}
} }
public VerifyingClient setVerifier(TriFunction<ActionType<?>, ActionRequest, ActionListener<?>, ActionResponse> verifier) { public VerifyingClient setVerifier(TriFunction<ActionType<?>, ActionRequest, ActionListener<?>, ActionResponse> verifier) {

View File

@ -164,8 +164,8 @@ public class IndexLifecycle extends Plugin implements ActionPlugin {
getClock(), System::currentTimeMillis, xContentRegistry)); getClock(), System::currentTimeMillis, xContentRegistry));
SnapshotLifecycleTemplateRegistry templateRegistry = new SnapshotLifecycleTemplateRegistry(settings, clusterService, threadPool, SnapshotLifecycleTemplateRegistry templateRegistry = new SnapshotLifecycleTemplateRegistry(settings, clusterService, threadPool,
client, xContentRegistry); client, xContentRegistry);
snapshotHistoryStore.set(new SnapshotHistoryStore(settings, new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN), snapshotHistoryStore.set(new SnapshotHistoryStore(settings, new OriginSettingClient(client, INDEX_LIFECYCLE_ORIGIN), clusterService
getClock().getZone())); ));
snapshotLifecycleService.set(new SnapshotLifecycleService(settings, snapshotLifecycleService.set(new SnapshotLifecycleService(settings,
() -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock())); () -> new SnapshotLifecycleTask(client, clusterService, snapshotHistoryStore.get()), clusterService, getClock()));
return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get()); return Arrays.asList(indexLifecycleInitialisationService.get(), snapshotLifecycleService.get(), snapshotHistoryStore.get());

View File

@ -232,7 +232,7 @@ public class SnapshotLifecycleTaskTests extends ESTestCase {
Consumer<SnapshotHistoryItem> verifier; Consumer<SnapshotHistoryItem> verifier;
public VerifyingHistoryStore(Client client, ZoneId timeZone, Consumer<SnapshotHistoryItem> verifier) { public VerifyingHistoryStore(Client client, ZoneId timeZone, Consumer<SnapshotHistoryItem> verifier) {
super(Settings.EMPTY, client, timeZone); super(Settings.EMPTY, client, null);
this.verifier = verifier; this.verifier = verifier;
} }