Improve upgrade process for reserved users (elastic/x-pack-elasticsearch#1712)

- Don't attempt to upgrade from 2.x
- Attempt up to 10 retries if the migration fails (with increasing back-off between attempts)
- If a cached user is disabled, recheck with the underlying store

The last change is required if the migration takes a long time.
While users are being migrated, they might be marked as disabled, but when the migration is complete they need to be usable immediately.

Original commit: elastic/x-pack-elasticsearch@2621867014
This commit is contained in:
Tim Vernum 2017-06-15 10:27:23 +10:00 committed by GitHub
parent 429f7ca7a3
commit e36f86cf95
7 changed files with 213 additions and 43 deletions

View File

@ -62,15 +62,14 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust
} }
// package private for testing // package private for testing
SecurityLifecycleService(Settings settings, ClusterService clusterService, SecurityLifecycleService(Settings settings, ClusterService clusterService, ThreadPool threadPool, InternalClient client,
ThreadPool threadPool, InternalClient client, NativeRealmMigrator migrator, @Nullable IndexAuditTrail indexAuditTrail) {
NativeRealmMigrator migrator,
@Nullable IndexAuditTrail indexAuditTrail) {
super(settings); super(settings);
this.settings = settings; this.settings = settings;
this.threadPool = threadPool; this.threadPool = threadPool;
this.indexAuditTrail = indexAuditTrail; this.indexAuditTrail = indexAuditTrail;
this.securityIndex = new IndexLifecycleManager(settings, client, SECURITY_INDEX_NAME, SECURITY_TEMPLATE_NAME, migrator); this.securityIndex = new IndexLifecycleManager(settings, client, clusterService, threadPool, SECURITY_INDEX_NAME,
SECURITY_TEMPLATE_NAME, migrator);
clusterService.addListener(this); clusterService.addListener(this);
clusterService.addLifecycleListener(new LifecycleListener() { clusterService.addLifecycleListener(new LifecycleListener() {
@Override @Override

View File

@ -83,7 +83,7 @@ public class NativeRealmMigrator implements IndexLifecycleManager.IndexDataMigra
final GroupedActionListener<Void> countDownListener = new GroupedActionListener<>( final GroupedActionListener<Void> countDownListener = new GroupedActionListener<>(
ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure), tasks.size(), emptyList() ActionListener.wrap(r -> listener.onResponse(true), listener::onFailure), tasks.size(), emptyList()
); );
logger.info("Performing {} security migration task(s)", tasks.size()); logger.info("Performing {} security migration task(s) from version {}", tasks.size(), previousVersion);
tasks.forEach(t -> t.accept(previousVersion, countDownListener)); tasks.forEach(t -> t.accept(previousVersion, countDownListener));
} }
} catch (Exception e) { } catch (Exception e) {
@ -111,7 +111,9 @@ public class NativeRealmMigrator implements IndexLifecycleManager.IndexDataMigra
* installs but problematic for already-locked-down upgrades. * installs but problematic for already-locked-down upgrades.
*/ */
private boolean isNewUser(@Nullable Version previousVersion, BuiltinUserInfo info) { private boolean isNewUser(@Nullable Version previousVersion, BuiltinUserInfo info) {
return previousVersion != null && previousVersion.before(info.getDefinedSince()); return previousVersion != null
&& previousVersion.before(info.getDefinedSince())
&& previousVersion.onOrAfter(Version.V_5_0_0);
} }
private void createUserAsDisabled(BuiltinUserInfo info, @Nullable Version previousVersion, ActionListener<Void> listener) { private void createUserAsDisabled(BuiltinUserInfo info, @Nullable Version previousVersion, ActionListener<Void> listener) {
@ -151,7 +153,9 @@ public class NativeRealmMigrator implements IndexLifecycleManager.IndexDataMigra
* does the right thing. * does the right thing.
*/ */
private boolean shouldConvertDefaultPasswords(@Nullable Version previousVersion) { private boolean shouldConvertDefaultPasswords(@Nullable Version previousVersion) {
return previousVersion != null && previousVersion.before(Version.V_6_0_0_alpha1); return previousVersion != null
&& previousVersion.before(Version.V_6_0_0_alpha1)
&& previousVersion.onOrAfter(Version.V_5_0_0);
} }
@SuppressWarnings("unused") @SuppressWarnings("unused")

View File

@ -100,9 +100,22 @@ public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm
}, listener::onFailure)); }, listener::onFailure));
} else if (userWithHash.hasHash()) { } else if (userWithHash.hasHash()) {
if (userWithHash.verify(token.credentials())) { if (userWithHash.verify(token.credentials())) {
logger.debug("realm [{}] authenticated user [{}], with roles [{}]", name(), token.principal(), if (userWithHash.user.enabled()) {
userWithHash.user.roles()); User user = userWithHash.user;
listener.onResponse(userWithHash.user); logger.debug("realm [{}] authenticated user [{}], with roles [{}]", name(), token.principal(), user.roles());
listener.onResponse(user);
} else {
// We successfully authenticated, but the cached user is disabled.
// Reload the primary record to check whether the user is still disabled
cache.invalidate(token.principal());
doAuthenticateAndCache(token, ActionListener.wrap((user) -> {
if (user != null) {
logger.debug("realm [{}] authenticated user [{}] (enabled:{}), with roles [{}]", name(), token.principal(),
user.enabled(), user.roles());
}
listener.onResponse(user);
}, listener::onFailure));
}
} else { } else {
cache.invalidate(token.principal()); cache.invalidate(token.principal());
doAuthenticateAndCache(token, ActionListener.wrap((user) -> { doAuthenticateAndCache(token, ActionListener.wrap((user) -> {

View File

@ -5,6 +5,19 @@
*/ */
package org.elasticsearch.xpack.security.support; package org.elasticsearch.xpack.security.support;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage; import org.apache.logging.log4j.message.ParameterizedMessage;
import org.apache.logging.log4j.util.Supplier; import org.apache.logging.log4j.util.Supplier;
@ -20,35 +33,23 @@ import org.elasticsearch.cluster.ClusterChangedEvent;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.AliasOrIndex; import org.elasticsearch.cluster.metadata.AliasOrIndex;
import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MappingMetaData; import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.IndexRoutingTable; import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray; import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.component.AbstractComponent; import org.elasticsearch.common.component.AbstractComponent;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.inject.internal.Nullable; import org.elasticsearch.common.inject.internal.Nullable;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections; import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.mapper.MapperService;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.xpack.security.InternalClient; import org.elasticsearch.xpack.security.InternalClient;
import org.elasticsearch.xpack.template.TemplateUtils; import org.elasticsearch.xpack.template.TemplateUtils;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Predicate;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import static org.elasticsearch.common.xcontent.XContentHelper.convertToMap; import static org.elasticsearch.common.xcontent.XContentHelper.convertToMap;
/** /**
@ -60,15 +61,20 @@ public class IndexLifecycleManager extends AbstractComponent {
public static final String TEMPLATE_VERSION_PATTERN = public static final String TEMPLATE_VERSION_PATTERN =
Pattern.quote("${security.template.version}"); Pattern.quote("${security.template.version}");
private static final int MAX_MIGRATE_ATTEMPTS = 10;
private final String indexName; private final String indexName;
private final String templateName; private final String templateName;
private final InternalClient client; private final InternalClient client;
private final IndexDataMigrator migrator; private final IndexDataMigrator migrator;
private final ClusterService clusterService;
private final ThreadPool threadPool;
private final AtomicBoolean templateCreationPending = new AtomicBoolean(false); private final AtomicBoolean templateCreationPending = new AtomicBoolean(false);
private final AtomicBoolean updateMappingPending = new AtomicBoolean(false); private final AtomicBoolean updateMappingPending = new AtomicBoolean(false);
private final AtomicReference<UpgradeState> migrateDataState = new AtomicReference<>(UpgradeState.NOT_STARTED); private final AtomicReference<UpgradeState> migrateDataState = new AtomicReference<>(UpgradeState.NOT_STARTED);
private final AtomicInteger migrateDataAttempts = new AtomicInteger(0);
private volatile boolean templateIsUpToDate; private volatile boolean templateIsUpToDate;
private volatile boolean indexExists; private volatile boolean indexExists;
@ -85,16 +91,17 @@ public class IndexLifecycleManager extends AbstractComponent {
void performUpgrade(@Nullable Version previousVersion, ActionListener<Boolean> listener); void performUpgrade(@Nullable Version previousVersion, ActionListener<Boolean> listener);
} }
public static final IndexDataMigrator NULL_MIGRATOR = public static final IndexDataMigrator NULL_MIGRATOR = (version, listener) -> listener.onResponse(false);
(version, listener) -> listener.onResponse(false);
public IndexLifecycleManager(Settings settings, InternalClient client, String indexName, public IndexLifecycleManager(Settings settings, InternalClient client, ClusterService clusterService, ThreadPool threadPool,
String templateName, IndexDataMigrator migrator) { String indexName, String templateName, IndexDataMigrator migrator) {
super(settings); super(settings);
this.client = client; this.client = client;
this.indexName = indexName; this.indexName = indexName;
this.templateName = templateName; this.templateName = templateName;
this.migrator = migrator; this.migrator = migrator;
this.clusterService = clusterService;
this.threadPool = threadPool;
} }
public boolean isTemplateUpToDate() { public boolean isTemplateUpToDate() {
@ -139,15 +146,20 @@ public class IndexLifecycleManager extends AbstractComponent {
public void clusterChanged(ClusterChangedEvent event) { public void clusterChanged(ClusterChangedEvent event) {
final ClusterState state = event.state(); final ClusterState state = event.state();
this.indexExists = resolveConcreteIndex(indexName, event.state().metaData()) != null; processClusterState(state);
}
private void processClusterState(ClusterState state) {
assert state != null;
this.indexExists = resolveConcreteIndex(indexName, state.metaData()) != null;
this.indexAvailable = checkIndexAvailable(state); this.indexAvailable = checkIndexAvailable(state);
this.templateIsUpToDate = TemplateUtils.checkTemplateExistsAndIsUpToDate(templateName, this.templateIsUpToDate = TemplateUtils.checkTemplateExistsAndIsUpToDate(templateName,
SECURITY_VERSION_STRING, state, logger); SECURITY_VERSION_STRING, state, logger);
this.mappingIsUpToDate = checkIndexMappingUpToDate(state); this.mappingIsUpToDate = checkIndexMappingUpToDate(state);
this.canWriteToIndex = templateIsUpToDate && mappingIsUpToDate; this.canWriteToIndex = templateIsUpToDate && mappingIsUpToDate;
this.mappingVersion = oldestIndexMappingVersion(event.state()); this.mappingVersion = oldestIndexMappingVersion(state);
if (event.localNodeMaster()) { if (state.nodes().isLocalNodeElectedMaster()) {
if (templateIsUpToDate == false) { if (templateIsUpToDate == false) {
updateTemplate(); updateTemplate();
} }
@ -233,7 +245,7 @@ public class IndexLifecycleManager extends AbstractComponent {
final List<IndexMetaData> indices = aliasOrIndex.getIndices(); final List<IndexMetaData> indices = aliasOrIndex.getIndices();
if (aliasOrIndex.isAlias() && indices.size() > 1) { if (aliasOrIndex.isAlias() && indices.size() > 1) {
throw new IllegalStateException("Alias [" + indexOrAliasName + "] points to more than one index: " + throw new IllegalStateException("Alias [" + indexOrAliasName + "] points to more than one index: " +
indices.stream().map(imd -> imd.getIndex().getName()).collect(Collectors.toList())); indices.stream().map(imd -> imd.getIndex().getName()).collect(Collectors.toList()));
} }
return indices.get(0); return indices.get(0);
} }
@ -280,10 +292,21 @@ public class IndexLifecycleManager extends AbstractComponent {
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
migrateDataState.set(UpgradeState.FAILED); migrateDataState.set(UpgradeState.FAILED);
logger.error((Supplier<?>) () -> new ParameterizedMessage( final int attempts = migrateDataAttempts.incrementAndGet();
"failed to upgrade security [{}] data from version [{}] ", logger.error(new ParameterizedMessage(
indexName, previousVersion), "failed to upgrade security [{}] data from version [{}] (Attempt {} of {})",
indexName, previousVersion, attempts, MAX_MIGRATE_ATTEMPTS),
e); e);
if (attempts < MAX_MIGRATE_ATTEMPTS) {
// The first retry is (1^5)ms = 1ms
// The last retry is (9^5)ms = 59s
final TimeValue retry = TimeValue.timeValueMillis((long) Math.pow(attempts, 5));
logger.info("Will attempt upgrade again in {}", retry);
threadPool.schedule(retry, ThreadPool.Names.SAME, IndexLifecycleManager.this::retryDataMigration);
} else {
logger.error("Security migration has failed after {} attempts. Restart the master node to try again.",
MAX_MIGRATE_ATTEMPTS);
}
} }
@Override @Override
@ -300,6 +323,12 @@ public class IndexLifecycleManager extends AbstractComponent {
} }
} }
private void retryDataMigration() {
if (migrateDataState.compareAndSet(UpgradeState.FAILED, UpgradeState.NOT_STARTED)) {
processClusterState(clusterService.state());
}
}
private void updateMapping() { private void updateMapping() {
// only update the mapping if this is not already in progress // only update the mapping if this is not already in progress
if (updateMappingPending.compareAndSet(false, true)) { if (updateMappingPending.compareAndSet(false, true)) {

View File

@ -158,6 +158,10 @@ public class NativeRealmMigratorTests extends ESTestCase {
verifyUpgrade(randomFrom(Version.V_6_0_0_alpha1), null, false); verifyUpgrade(randomFrom(Version.V_6_0_0_alpha1), null, false);
} }
public void testNoChangeOnUpgradeFromV5alpha1() throws Exception {
verifyUpgrade(randomFrom(Version.V_5_0_0_alpha1), null, false);
}
public void testDisableLogstashBeatsAndConvertPasswordsOnUpgradeFromVersionPriorToV5_2() throws Exception { public void testDisableLogstashBeatsAndConvertPasswordsOnUpgradeFromVersionPriorToV5_2() throws Exception {
this.reservedUsers = Collections.singletonMap( this.reservedUsers = Collections.singletonMap(
KibanaUser.NAME, KibanaUser.NAME,

View File

@ -18,14 +18,17 @@ import org.elasticsearch.test.ESTestCase;
import org.junit.Before; import org.junit.Before;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import static java.util.Collections.emptyMap;
import static org.hamcrest.Matchers.arrayContaining; import static org.hamcrest.Matchers.arrayContaining;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.not; import static org.hamcrest.Matchers.not;
@ -185,6 +188,35 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase {
assertThat(realm.authInvocationCounter.intValue(), is(2)); assertThat(realm.authInvocationCounter.intValue(), is(2));
} }
public void testCacheDisabledUser() {
AlwaysAuthenticateCachingRealm realm = new AlwaysAuthenticateCachingRealm(globalSettings);
realm.setUsersEnabled(false);
String user = "testUser";
SecureString password = new SecureString("password");
PlainActionFuture<User> future = new PlainActionFuture<>();
realm.authenticate(new UsernamePasswordToken(user, password), future);
assertThat(future.actionGet().enabled(), equalTo(false));
assertThat(realm.authInvocationCounter.intValue(), is(1));
realm.setUsersEnabled(true);
future = new PlainActionFuture<>();
realm.authenticate(new UsernamePasswordToken(user, password), future);
future.actionGet();
assertThat(future.actionGet().enabled(), equalTo(true));
assertThat(realm.authInvocationCounter.intValue(), is(2));
future = new PlainActionFuture<>();
realm.authenticate(new UsernamePasswordToken(user, password), future);
future.actionGet();
assertThat(future.actionGet().enabled(), equalTo(true));
assertThat(realm.authInvocationCounter.intValue(), is(2));
}
public void testCacheWithVeryLowTtlExpiresBetweenAuthenticateCalls() throws InterruptedException { public void testCacheWithVeryLowTtlExpiresBetweenAuthenticateCalls() throws InterruptedException {
TimeValue ttl = TimeValue.timeValueNanos(randomIntBetween(10, 100)); TimeValue ttl = TimeValue.timeValueNanos(randomIntBetween(10, 100));
Settings settings = Settings.builder() Settings settings = Settings.builder()
@ -439,6 +471,8 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase {
public final AtomicInteger authInvocationCounter = new AtomicInteger(0); public final AtomicInteger authInvocationCounter = new AtomicInteger(0);
public final AtomicInteger lookupInvocationCounter = new AtomicInteger(0); public final AtomicInteger lookupInvocationCounter = new AtomicInteger(0);
private boolean usersEnabled = true;
AlwaysAuthenticateCachingRealm(Settings globalSettings) { AlwaysAuthenticateCachingRealm(Settings globalSettings) {
this(new RealmConfig("always-test", Settings.EMPTY, globalSettings, new ThreadContext(Settings.EMPTY))); this(new RealmConfig("always-test", Settings.EMPTY, globalSettings, new ThreadContext(Settings.EMPTY)));
} }
@ -447,10 +481,15 @@ public class CachingUsernamePasswordRealmTests extends ESTestCase {
super("always", config); super("always", config);
} }
void setUsersEnabled(boolean usersEnabled) {
this.usersEnabled = usersEnabled;
}
@Override @Override
protected void doAuthenticate(UsernamePasswordToken token, ActionListener<User> listener) { protected void doAuthenticate(UsernamePasswordToken token, ActionListener<User> listener) {
authInvocationCounter.incrementAndGet(); authInvocationCounter.incrementAndGet();
listener.onResponse(new User(token.principal(), new String[] { "testRole1", "testRole2" })); final User user = new User(token.principal(), new String[] { "testRole1", "testRole2" }, null, null, emptyMap(), usersEnabled);
listener.onResponse(user);
} }
@Override @Override

View File

@ -6,6 +6,7 @@
package org.elasticsearch.xpack.security.support; package org.elasticsearch.xpack.security.support;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.LinkedHashMap; import java.util.LinkedHashMap;
import java.util.Map; import java.util.Map;
@ -31,7 +32,9 @@ import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData; import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ThreadContext; import org.elasticsearch.common.util.concurrent.ThreadContext;
import org.elasticsearch.common.xcontent.XContentType; import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -41,9 +44,13 @@ import org.elasticsearch.xpack.security.test.SecurityTestUtils;
import org.elasticsearch.xpack.template.TemplateUtils; import org.elasticsearch.xpack.template.TemplateUtils;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Before; import org.junit.Before;
import org.mockito.Mockito;
import static org.elasticsearch.xpack.security.support.IndexLifecycleManager.NULL_MIGRATOR; import static org.elasticsearch.xpack.security.support.IndexLifecycleManager.NULL_MIGRATOR;
import static org.elasticsearch.xpack.security.support.IndexLifecycleManager.TEMPLATE_VERSION_PATTERN; import static org.elasticsearch.xpack.security.support.IndexLifecycleManager.TEMPLATE_VERSION_PATTERN;
import static org.hamcrest.Matchers.notNullValue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.notNull;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -56,13 +63,17 @@ public class IndexLifecycleManagerTests extends ESTestCase {
private IndexLifecycleManager manager; private IndexLifecycleManager manager;
private IndexLifecycleManager.IndexDataMigrator migrator; private IndexLifecycleManager.IndexDataMigrator migrator;
private Map<Action<?, ?, ?>, Map<ActionRequest, ActionListener<?>>> actions; private Map<Action<?, ?, ?>, Map<ActionRequest, ActionListener<?>>> actions;
private ThreadPool threadPool;
private ClusterService clusterService;
@Before @Before
public void setUpManager() { public void setUpManager() {
final Client mockClient = mock(Client.class); final Client mockClient = mock(Client.class);
ThreadPool threadPool = mock(ThreadPool.class); threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY)); when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
clusterService = mock(ClusterService.class);
actions = new LinkedHashMap<>(); actions = new LinkedHashMap<>();
final InternalClient client = new InternalClient(Settings.EMPTY, threadPool, mockClient) { final InternalClient client = new InternalClient(Settings.EMPTY, threadPool, mockClient) {
@Override @Override
@ -77,7 +88,7 @@ public class IndexLifecycleManagerTests extends ESTestCase {
} }
}; };
migrator = NULL_MIGRATOR; migrator = NULL_MIGRATOR;
manager = new IndexLifecycleManager(Settings.EMPTY, client, INDEX_NAME, TEMPLATE_NAME, manager = new IndexLifecycleManager(Settings.EMPTY, client, clusterService, threadPool, INDEX_NAME, TEMPLATE_NAME,
// Wrap the migrator in a lambda so that individual tests can override the migrator implementation. // Wrap the migrator in a lambda so that individual tests can override the migrator implementation.
(previousVersion, listener) -> migrator.performUpgrade(previousVersion, listener) (previousVersion, listener) -> migrator.performUpgrade(previousVersion, listener)
); );
@ -143,6 +154,72 @@ public class IndexLifecycleManagerTests extends ESTestCase {
assertCompleteState(true); assertCompleteState(true);
} }
public void testRetryDataMigration() throws IOException {
assertInitialState();
AtomicReference<ActionListener<Boolean>> migrationListenerRef = new AtomicReference<>(null);
migrator = (version, listener) -> migrationListenerRef.set(listener);
ClusterState.Builder clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME + "-v512");
markShardsAvailable(clusterStateBuilder);
manager.clusterChanged(event(clusterStateBuilder));
assertTemplateAndMappingOutOfDate(true, false, IndexLifecycleManager.UpgradeState.IN_PROGRESS);
actions.get(PutIndexTemplateAction.INSTANCE).values().forEach(
l -> ((ActionListener<PutIndexTemplateResponse>) l).onResponse(new PutIndexTemplateResponse(true) {
})
);
actions.get(PutIndexTemplateAction.INSTANCE).clear();
clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME, TEMPLATE_NAME + "-v512");
markShardsAvailable(clusterStateBuilder);
when(clusterService.state()).thenReturn(clusterStateBuilder.build());
assertTemplateAndMappingOutOfDate(false, false, IndexLifecycleManager.UpgradeState.IN_PROGRESS);
AtomicReference<Runnable> scheduled = new AtomicReference<>(null);
when(threadPool.schedule(any(TimeValue.class), Mockito.eq(ThreadPool.Names.SAME), any(Runnable.class))).thenAnswer(invocation -> {
final Runnable runnable = (Runnable) invocation.getArguments()[2];
scheduled.set(runnable);
return null;
});
migrationListenerRef.get().onFailure(new RuntimeException("Migration Failed #1"));
assertTemplateAndMappingOutOfDate(false, false, IndexLifecycleManager.UpgradeState.FAILED);
assertThat(scheduled.get(), notNullValue());
scheduled.get().run();
assertTemplateAndMappingOutOfDate(false, false, IndexLifecycleManager.UpgradeState.IN_PROGRESS);
scheduled.set(null);
migrationListenerRef.get().onFailure(new RuntimeException("Migration Failed #2"));
assertTemplateAndMappingOutOfDate(false, false, IndexLifecycleManager.UpgradeState.FAILED);
assertThat(scheduled.get(), notNullValue());
actions.getOrDefault(PutIndexTemplateAction.INSTANCE, Collections.emptyMap()).clear();
scheduled.get().run();
assertTemplateAndMappingOutOfDate(false, false, IndexLifecycleManager.UpgradeState.IN_PROGRESS);
scheduled.set(null);
migrationListenerRef.get().onResponse(false);
assertTemplateAndMappingOutOfDate(false, true, IndexLifecycleManager.UpgradeState.COMPLETE);
actions.get(PutMappingAction.INSTANCE).values().forEach(
l -> ((ActionListener<PutMappingResponse>) l).onResponse(new PutMappingResponse(true) {
})
);
assertTemplateAndMappingOutOfDate(false, false, IndexLifecycleManager.UpgradeState.COMPLETE);
clusterStateBuilder = createClusterState(INDEX_NAME, TEMPLATE_NAME);
markShardsAvailable(clusterStateBuilder);
manager.clusterChanged(event(clusterStateBuilder));
assertCompleteState(true);
}
private void assertInitialState() { private void assertInitialState() {
assertThat(manager.indexExists(), Matchers.equalTo(false)); assertThat(manager.indexExists(), Matchers.equalTo(false));
assertThat(manager.isAvailable(), Matchers.equalTo(false)); assertThat(manager.isAvailable(), Matchers.equalTo(false));
@ -190,7 +267,7 @@ public class IndexLifecycleManagerTests extends ESTestCase {
if (templateUpdatePending) { if (templateUpdatePending) {
final Map<ActionRequest, ActionListener<?>> requests = actions.get(PutIndexTemplateAction.INSTANCE); final Map<ActionRequest, ActionListener<?>> requests = actions.get(PutIndexTemplateAction.INSTANCE);
assertThat(requests, Matchers.notNullValue()); assertThat(requests, notNullValue());
assertThat(requests.size(), Matchers.equalTo(1)); assertThat(requests.size(), Matchers.equalTo(1));
final ActionRequest request = requests.keySet().iterator().next(); final ActionRequest request = requests.keySet().iterator().next();
assertThat(request, Matchers.instanceOf(PutIndexTemplateRequest.class)); assertThat(request, Matchers.instanceOf(PutIndexTemplateRequest.class));
@ -199,7 +276,7 @@ public class IndexLifecycleManagerTests extends ESTestCase {
if (mappingUpdatePending) { if (mappingUpdatePending) {
final Map<ActionRequest, ActionListener<?>> requests = actions.get(PutMappingAction.INSTANCE); final Map<ActionRequest, ActionListener<?>> requests = actions.get(PutMappingAction.INSTANCE);
assertThat(requests, Matchers.notNullValue()); assertThat(requests, notNullValue());
assertThat(requests.size(), Matchers.equalTo(1)); assertThat(requests.size(), Matchers.equalTo(1));
final ActionRequest request = requests.keySet().iterator().next(); final ActionRequest request = requests.keySet().iterator().next();
assertThat(request, Matchers.instanceOf(PutMappingRequest.class)); assertThat(request, Matchers.instanceOf(PutMappingRequest.class));
@ -228,8 +305,13 @@ public class IndexLifecycleManagerTests extends ESTestCase {
} }
public static ClusterState.Builder createClusterState(String indexName, String templateName) throws IOException { public static ClusterState.Builder createClusterState(String indexName, String templateName) throws IOException {
return createClusterState(indexName, templateName, templateName);
}
private static ClusterState.Builder createClusterState(String indexName, String templateName, String buildMappingFrom)
throws IOException {
IndexTemplateMetaData.Builder templateBuilder = getIndexTemplateMetaData(templateName); IndexTemplateMetaData.Builder templateBuilder = getIndexTemplateMetaData(templateName);
IndexMetaData.Builder indexMeta = getIndexMetadata(indexName, templateName); IndexMetaData.Builder indexMeta = getIndexMetadata(indexName, buildMappingFrom);
MetaData.Builder metaDataBuilder = new MetaData.Builder(); MetaData.Builder metaDataBuilder = new MetaData.Builder();
metaDataBuilder.put(templateBuilder); metaDataBuilder.put(templateBuilder);