From e36f86cf953dba32050c0556fd5605fb58c66196 Mon Sep 17 00:00:00 2001 From: Tim Vernum Date: Thu, 15 Jun 2017 10:27:23 +1000 Subject: [PATCH] Improve upgrade process for reserved users (elastic/x-pack-elasticsearch#1712) - Don't attempt to upgrade from 2.x - Attempt up to 10 retries if the migration fails (with increasing back-off between attempts) - If a cached user is disabled, recheck with the underlying store The last change is required if the migration takes a long time. While users are being migrated, they might be marked as disabled, but when the migration is complete they need to be usable immediately. Original commit: elastic/x-pack-elasticsearch@2621867014149b6f922c8f51cae72cf9fa026ab8 --- .../security/SecurityLifecycleService.java | 9 +- .../authc/esnative/NativeRealmMigrator.java | 10 +- .../support/CachingUsernamePasswordRealm.java | 19 +++- .../support/IndexLifecycleManager.java | 81 ++++++++++------ .../esnative/NativeRealmMigratorTests.java | 4 + .../CachingUsernamePasswordRealmTests.java | 41 ++++++++- .../support/IndexLifecycleManagerTests.java | 92 ++++++++++++++++++- 7 files changed, 213 insertions(+), 43 deletions(-) diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java b/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java index 9788ff092d9..7d4a1456017 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/SecurityLifecycleService.java @@ -62,15 +62,14 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust } // package private for testing - SecurityLifecycleService(Settings settings, ClusterService clusterService, - ThreadPool threadPool, InternalClient client, - NativeRealmMigrator migrator, - @Nullable IndexAuditTrail indexAuditTrail) { + SecurityLifecycleService(Settings settings, ClusterService clusterService, ThreadPool threadPool, InternalClient client, + NativeRealmMigrator migrator, @Nullable IndexAuditTrail indexAuditTrail) { super(settings); this.settings = settings; this.threadPool = threadPool; this.indexAuditTrail = indexAuditTrail; - this.securityIndex = new IndexLifecycleManager(settings, client, SECURITY_INDEX_NAME, SECURITY_TEMPLATE_NAME, migrator); + this.securityIndex = new IndexLifecycleManager(settings, client, clusterService, threadPool, SECURITY_INDEX_NAME, + SECURITY_TEMPLATE_NAME, migrator); clusterService.addListener(this); clusterService.addLifecycleListener(new LifecycleListener() { @Override diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmMigrator.java b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmMigrator.java index 21dcb4af39d..dc9971b9147 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmMigrator.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmMigrator.java @@ -83,7 +83,7 @@ public class NativeRealmMigrator implements IndexLifecycleManager.IndexDataMigra final GroupedActionListener countDownListener = new GroupedActionListener<>( ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure), tasks.size(), emptyList() ); - logger.info("Performing {} security migration task(s)", tasks.size()); + logger.info("Performing {} security migration task(s) from version {}", tasks.size(), previousVersion); tasks.forEach(t -> t.accept(previousVersion, countDownListener)); } } catch (Exception e) { @@ -111,7 +111,9 @@ public class NativeRealmMigrator implements IndexLifecycleManager.IndexDataMigra * installs but problematic for already-locked-down upgrades. */ private boolean isNewUser(@Nullable Version previousVersion, BuiltinUserInfo info) { - return previousVersion != null && previousVersion.before(info.getDefinedSince()); + return previousVersion != null + && previousVersion.before(info.getDefinedSince()) + && previousVersion.onOrAfter(Version.V_5_0_0); } private void createUserAsDisabled(BuiltinUserInfo info, @Nullable Version previousVersion, ActionListener listener) { @@ -151,7 +153,9 @@ public class NativeRealmMigrator implements IndexLifecycleManager.IndexDataMigra * does the right thing. */ private boolean shouldConvertDefaultPasswords(@Nullable Version previousVersion) { - return previousVersion != null && previousVersion.before(Version.V_6_0_0_alpha1); + return previousVersion != null + && previousVersion.before(Version.V_6_0_0_alpha1) + && previousVersion.onOrAfter(Version.V_5_0_0); } @SuppressWarnings("unused") diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java index babb74b1f5e..40728e31ab3 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java @@ -100,9 +100,22 @@ public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm }, listener::onFailure)); } else if (userWithHash.hasHash()) { if (userWithHash.verify(token.credentials())) { - logger.debug("realm [{}] authenticated user [{}], with roles [{}]", name(), token.principal(), - userWithHash.user.roles()); - listener.onResponse(userWithHash.user); + if (userWithHash.user.enabled()) { + User user = userWithHash.user; + logger.debug("realm [{}] authenticated user [{}], with roles [{}]", name(), token.principal(), user.roles()); + listener.onResponse(user); + } else { + // We successfully authenticated, but the cached user is disabled. + // Reload the primary record to check whether the user is still disabled + cache.invalidate(token.principal()); + doAuthenticateAndCache(token, ActionListener.wrap((user) -> { + if (user != null) { + logger.debug("realm [{}] authenticated user [{}] (enabled:{}), with roles [{}]", name(), token.principal(), + user.enabled(), user.roles()); + } + listener.onResponse(user); + }, listener::onFailure)); + } } else { cache.invalidate(token.principal()); doAuthenticateAndCache(token, ActionListener.wrap((user) -> { diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java b/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java index dfe3151ccf3..213a02899fe 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/support/IndexLifecycleManager.java @@ -5,6 +5,19 @@ */ package org.elasticsearch.xpack.security.support; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Predicate; +import java.util.regex.Pattern; +import java.util.stream.Collectors; + import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.util.Supplier; @@ -20,35 +33,23 @@ import org.elasticsearch.cluster.ClusterChangedEvent; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.IndexMetaData; -import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.bytes.BytesArray; -import org.elasticsearch.common.collect.ImmutableOpenMap; import org.elasticsearch.common.component.AbstractComponent; -import org.elasticsearch.common.compress.CompressedXContent; import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.index.mapper.MapperService; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.xpack.security.InternalClient; import org.elasticsearch.xpack.template.TemplateUtils; -import java.io.IOException; -import java.nio.charset.StandardCharsets; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicReference; -import java.util.function.Predicate; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - import static org.elasticsearch.common.xcontent.XContentHelper.convertToMap; /** @@ -60,15 +61,20 @@ public class IndexLifecycleManager extends AbstractComponent { public static final String TEMPLATE_VERSION_PATTERN = Pattern.quote("${security.template.version}"); + private static final int MAX_MIGRATE_ATTEMPTS = 10; + private final String indexName; private final String templateName; private final InternalClient client; private final IndexDataMigrator migrator; + private final ClusterService clusterService; + private final ThreadPool threadPool; private final AtomicBoolean templateCreationPending = new AtomicBoolean(false); private final AtomicBoolean updateMappingPending = new AtomicBoolean(false); private final AtomicReference migrateDataState = new AtomicReference<>(UpgradeState.NOT_STARTED); + private final AtomicInteger migrateDataAttempts = new AtomicInteger(0); private volatile boolean templateIsUpToDate; private volatile boolean indexExists; @@ -85,16 +91,17 @@ public class IndexLifecycleManager extends AbstractComponent { void performUpgrade(@Nullable Version previousVersion, ActionListener listener); } - public static final IndexDataMigrator NULL_MIGRATOR = - (version, listener) -> listener.onResponse(false); + public static final IndexDataMigrator NULL_MIGRATOR = (version, listener) -> listener.onResponse(false); - public IndexLifecycleManager(Settings settings, InternalClient client, String indexName, - String templateName, IndexDataMigrator migrator) { + public IndexLifecycleManager(Settings settings, InternalClient client, ClusterService clusterService, ThreadPool threadPool, + String indexName, String templateName, IndexDataMigrator migrator) { super(settings); this.client = client; this.indexName = indexName; this.templateName = templateName; this.migrator = migrator; + this.clusterService = clusterService; + this.threadPool = threadPool; } public boolean isTemplateUpToDate() { @@ -139,15 +146,20 @@ public class IndexLifecycleManager extends AbstractComponent { public void clusterChanged(ClusterChangedEvent event) { final ClusterState state = event.state(); - this.indexExists = resolveConcreteIndex(indexName, event.state().metaData()) != null; + processClusterState(state); + } + + private void processClusterState(ClusterState state) { + assert state != null; + this.indexExists = resolveConcreteIndex(indexName, state.metaData()) != null; this.indexAvailable = checkIndexAvailable(state); this.templateIsUpToDate = TemplateUtils.checkTemplateExistsAndIsUpToDate(templateName, SECURITY_VERSION_STRING, state, logger); this.mappingIsUpToDate = checkIndexMappingUpToDate(state); this.canWriteToIndex = templateIsUpToDate && mappingIsUpToDate; - this.mappingVersion = oldestIndexMappingVersion(event.state()); + this.mappingVersion = oldestIndexMappingVersion(state); - if (event.localNodeMaster()) { + if (state.nodes().isLocalNodeElectedMaster()) { if (templateIsUpToDate == false) { updateTemplate(); } @@ -233,7 +245,7 @@ public class IndexLifecycleManager extends AbstractComponent { final List indices = aliasOrIndex.getIndices(); if (aliasOrIndex.isAlias() && indices.size() > 1) { throw new IllegalStateException("Alias [" + indexOrAliasName + "] points to more than one index: " + - indices.stream().map(imd -> imd.getIndex().getName()).collect(Collectors.toList())); + indices.stream().map(imd -> imd.getIndex().getName()).collect(Collectors.toList())); } return indices.get(0); } @@ -280,10 +292,21 @@ public class IndexLifecycleManager extends AbstractComponent { @Override public void onFailure(Exception e) { migrateDataState.set(UpgradeState.FAILED); - logger.error((Supplier) () -> new ParameterizedMessage( - "failed to upgrade security [{}] data from version [{}] ", - indexName, previousVersion), + final int attempts = migrateDataAttempts.incrementAndGet(); + logger.error(new ParameterizedMessage( + "failed to upgrade security [{}] data from version [{}] (Attempt {} of {})", + indexName, previousVersion, attempts, MAX_MIGRATE_ATTEMPTS), e); + if (attempts < MAX_MIGRATE_ATTEMPTS) { + // The first retry is (1^5)ms = 1ms + // The last retry is (9^5)ms = 59s + final TimeValue retry = TimeValue.timeValueMillis((long) Math.pow(attempts, 5)); + logger.info("Will attempt upgrade again in {}", retry); + threadPool.schedule(retry, ThreadPool.Names.SAME, IndexLifecycleManager.this::retryDataMigration); + } else { + logger.error("Security migration has failed after {} attempts. Restart the master node to try again.", + MAX_MIGRATE_ATTEMPTS); + } } @Override @@ -300,6 +323,12 @@ public class IndexLifecycleManager extends AbstractComponent { } } + private void retryDataMigration() { + if (migrateDataState.compareAndSet(UpgradeState.FAILED, UpgradeState.NOT_STARTED)) { + processClusterState(clusterService.state()); + } + } + private void updateMapping() { // only update the mapping if this is not already in progress if (updateMappingPending.compareAndSet(false, true)) { diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmMigratorTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmMigratorTests.java index a6c7e7eecfb..b097c4f2ec0 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmMigratorTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/authc/esnative/NativeRealmMigratorTests.java @@ -158,6 +158,10 @@ public class NativeRealmMigratorTests extends ESTestCase { verifyUpgrade(randomFrom(Version.V_6_0_0_alpha1), null, false); } + public void testNoChangeOnUpgradeFromV5alpha1() throws Exception { + verifyUpgrade(randomFrom(Version.V_5_0_0_alpha1), null, false); + } + public void testDisableLogstashBeatsAndConvertPasswordsOnUpgradeFromVersionPriorToV5_2() throws Exception { this.reservedUsers = Collections.singletonMap( KibanaUser.NAME, diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java index 42c554755d8..9be5577c99b 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealmTests.java @@ -18,14 +18,17 @@ import org.elasticsearch.test.ESTestCase; import org.junit.Before; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import static java.util.Collections.emptyMap; import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.not; @@ -185,6 +188,35 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { assertThat(realm.authInvocationCounter.intValue(), is(2)); } + public void testCacheDisabledUser() { + AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(globalSettings); + realm.setUsersEnabled(false); + + String user = "testUser"; + SecureString password = new SecureString("password"); + + PlainActionFuture future = new PlainActionFuture<>(); + realm.authenticate(new UsernamePasswordToken(user, password), future); + assertThat(future.actionGet().enabled(), equalTo(false)); + + assertThat(realm.authInvocationCounter.intValue(), is(1)); + + realm.setUsersEnabled(true); + future = new PlainActionFuture<>(); + realm.authenticate(new UsernamePasswordToken(user, password), future); + future.actionGet(); + assertThat(future.actionGet().enabled(), equalTo(true)); + + assertThat(realm.authInvocationCounter.intValue(), is(2)); + + future = new PlainActionFuture<>(); + realm.authenticate(new UsernamePasswordToken(user, password), future); + future.actionGet(); + assertThat(future.actionGet().enabled(), equalTo(true)); + + assertThat(realm.authInvocationCounter.intValue(), is(2)); + } + public void testCacheWithVeryLowTtlExpiresBetweenAuthenticateCalls() throws InterruptedException { TimeValue ttl = TimeValue.timeValueNanos(randomIntBetween(10, 100)); Settings settings = Settings.builder() @@ -439,6 +471,8 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { public final AtomicInteger authInvocationCounter = new AtomicInteger(0); public final AtomicInteger lookupInvocationCounter = new AtomicInteger(0); + private boolean usersEnabled = true; + AlwaysAuthenticateCachingRealm(Settings globalSettings) { this(new RealmConfig("always-test", Settings.EMPTY, globalSettings, new ThreadContext(Settings.EMPTY))); } @@ -447,10 +481,15 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase { super("always", config); } + void setUsersEnabled(boolean usersEnabled) { + this.usersEnabled = usersEnabled; + } + @Override protected void doAuthenticate(UsernamePasswordToken token, ActionListener listener) { authInvocationCounter.incrementAndGet(); - listener.onResponse(new User(token.principal(), new String[] { "testRole1", "testRole2" })); + final User user = new User(token.principal(), new String[] { "testRole1", "testRole2" }, null, null, emptyMap(), usersEnabled); + listener.onResponse(user); } @Override diff --git a/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java b/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java index 0c832a8f5c1..0d511278a30 100644 --- a/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java +++ b/plugin/src/test/java/org/elasticsearch/xpack/security/support/IndexLifecycleManagerTests.java @@ -6,6 +6,7 @@ package org.elasticsearch.xpack.security.support; import java.io.IOException; +import java.util.Collections; import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; @@ -31,7 +32,9 @@ import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.test.ESTestCase; @@ -41,9 +44,13 @@ import org.elasticsearch.xpack.security.test.SecurityTestUtils; import org.elasticsearch.xpack.template.TemplateUtils; import org.hamcrest.Matchers; import org.junit.Before; +import org.mockito.Mockito; import static org.elasticsearch.xpack.security.support.IndexLifecycleManager.NULL_MIGRATOR; import static org.elasticsearch.xpack.security.support.IndexLifecycleManager.TEMPLATE_VERSION_PATTERN; +import static org.hamcrest.Matchers.notNullValue; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.notNull; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -56,13 +63,17 @@ public class IndexLifecycleManagerTests extends ESTestCase { private IndexLifecycleManager manager; private IndexLifecycleManager.IndexDataMigrator migrator; private Map, Map>> actions; + private ThreadPool threadPool; + private ClusterService clusterService; @Before public void setUpManager() { final Client mockClient = mock(Client.class); - ThreadPool threadPool = mock(ThreadPool.class); + threadPool = mock(ThreadPool.class); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); + clusterService = mock(ClusterService.class); + actions = new LinkedHashMap<>(); final InternalClient client = new InternalClient(Settings.EMPTY, threadPool, mockClient) { @Override @@ -77,7 +88,7 @@ public class IndexLifecycleManagerTests extends ESTestCase { } }; migrator = NULL_MIGRATOR; - manager = new IndexLifecycleManager(Settings.EMPTY, client, INDEX_NAME, TEMPLATE_NAME, + manager = new IndexLifecycleManager(Settings.EMPTY, client, clusterService, threadPool, INDEX_NAME, TEMPLATE_NAME, // Wrap the migrator in a lambda so that individual tests can override the migrator implementation. (previousVersion, listener) -> migrator.performUpgrade(previousVersion, listener) ); @@ -143,6 +154,72 @@ public class IndexLifecycleManagerTests extends ESTestCase { assertCompleteState(true); } + public void testRetryDataMigration() throws IOException { + assertInitialState(); + + AtomicReference> migrationListenerRef = new AtomicReference<>(null); + migrator = (version, listener) -> migrationListenerRef.set(listener); + + ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME + "-v512"); + markShardsAvailable(clusterStateBuilder); + manager.clusterChanged(event(clusterStateBuilder)); + + assertTemplateAndMappingOutOfDate(true, false, IndexLifecycleManager.UpgradeState.IN_PROGRESS); + + actions.get(PutIndexTemplateAction.INSTANCE).values().forEach( + l -> ((ActionListener) l).onResponse(new PutIndexTemplateResponse(true) { + }) + ); + actions.get(PutIndexTemplateAction.INSTANCE).clear(); + + clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME, TEMPLATE_NAME + "-v512"); + markShardsAvailable(clusterStateBuilder); + when(clusterService.state()).thenReturn(clusterStateBuilder.build()); + + assertTemplateAndMappingOutOfDate(false, false, IndexLifecycleManager.UpgradeState.IN_PROGRESS); + + AtomicReference scheduled = new AtomicReference<>(null); + when(threadPool.schedule(any(TimeValue.class), Mockito.eq(ThreadPool.Names.SAME), any(Runnable.class))).thenAnswer(invocation -> { + final Runnable runnable = (Runnable) invocation.getArguments()[2]; + scheduled.set(runnable); + return null; + }); + + + migrationListenerRef.get().onFailure(new RuntimeException("Migration Failed #1")); + assertTemplateAndMappingOutOfDate(false, false, IndexLifecycleManager.UpgradeState.FAILED); + assertThat(scheduled.get(), notNullValue()); + + scheduled.get().run(); + assertTemplateAndMappingOutOfDate(false, false, IndexLifecycleManager.UpgradeState.IN_PROGRESS); + scheduled.set(null); + + migrationListenerRef.get().onFailure(new RuntimeException("Migration Failed #2")); + assertTemplateAndMappingOutOfDate(false, false, IndexLifecycleManager.UpgradeState.FAILED); + assertThat(scheduled.get(), notNullValue()); + + actions.getOrDefault(PutIndexTemplateAction.INSTANCE, Collections.emptyMap()).clear(); + scheduled.get().run(); + assertTemplateAndMappingOutOfDate(false, false, IndexLifecycleManager.UpgradeState.IN_PROGRESS); + scheduled.set(null); + + migrationListenerRef.get().onResponse(false); + assertTemplateAndMappingOutOfDate(false, true, IndexLifecycleManager.UpgradeState.COMPLETE); + + actions.get(PutMappingAction.INSTANCE).values().forEach( + l -> ((ActionListener) l).onResponse(new PutMappingResponse(true) { + }) + ); + + assertTemplateAndMappingOutOfDate(false, false, IndexLifecycleManager.UpgradeState.COMPLETE); + + clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME); + markShardsAvailable(clusterStateBuilder); + manager.clusterChanged(event(clusterStateBuilder)); + + assertCompleteState(true); + } + private void assertInitialState() { assertThat(manager.indexExists(), Matchers.equalTo(false)); assertThat(manager.isAvailable(), Matchers.equalTo(false)); @@ -190,7 +267,7 @@ public class IndexLifecycleManagerTests extends ESTestCase { if (templateUpdatePending) { final Map> requests = actions.get(PutIndexTemplateAction.INSTANCE); - assertThat(requests, Matchers.notNullValue()); + assertThat(requests, notNullValue()); assertThat(requests.size(), Matchers.equalTo(1)); final ActionRequest request = requests.keySet().iterator().next(); assertThat(request, Matchers.instanceOf(PutIndexTemplateRequest.class)); @@ -199,7 +276,7 @@ public class IndexLifecycleManagerTests extends ESTestCase { if (mappingUpdatePending) { final Map> requests = actions.get(PutMappingAction.INSTANCE); - assertThat(requests, Matchers.notNullValue()); + assertThat(requests, notNullValue()); assertThat(requests.size(), Matchers.equalTo(1)); final ActionRequest request = requests.keySet().iterator().next(); assertThat(request, Matchers.instanceOf(PutMappingRequest.class)); @@ -228,8 +305,13 @@ public class IndexLifecycleManagerTests extends ESTestCase { } public static ClusterState.Builder createClusterState(String indexName, String templateName) throws IOException { + return createClusterState(indexName, templateName, templateName); + } + + private static ClusterState.Builder createClusterState(String indexName, String templateName, String buildMappingFrom) + throws IOException { IndexTemplateMetaData.Builder templateBuilder = getIndexTemplateMetaData(templateName); - IndexMetaData.Builder indexMeta = getIndexMetadata(indexName, templateName); + IndexMetaData.Builder indexMeta = getIndexMetadata(indexName, buildMappingFrom); MetaData.Builder metaDataBuilder = new MetaData.Builder(); metaDataBuilder.put(templateBuilder);