Fix audit index template upgrade loop (#30779)
The Index Audit trail allows the override of the template index settings with settings specified on the conf file. A bug will manifest when such conf file settings are specified for templates that need to be upgraded. The bug is an endless upgrade loop because the upgrade, although successful, is not reckoned as such by the upgrade service.
This commit is contained in:
parent
23d156f023
commit
0c8c619181
|
@ -19,7 +19,6 @@
|
||||||
|
|
||||||
package org.elasticsearch.cluster.metadata;
|
package org.elasticsearch.cluster.metadata;
|
||||||
|
|
||||||
import com.carrotsearch.hppc.cursors.ObjectCursor;
|
|
||||||
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
import com.carrotsearch.hppc.cursors.ObjectObjectCursor;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
@ -32,8 +31,6 @@ import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateListener;
|
import org.elasticsearch.cluster.ClusterStateListener;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||||
|
@ -57,6 +54,7 @@ import java.util.HashSet;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.function.UnaryOperator;
|
import java.util.function.UnaryOperator;
|
||||||
|
|
||||||
|
@ -74,7 +72,7 @@ public class TemplateUpgradeService extends AbstractComponent implements Cluster
|
||||||
|
|
||||||
public final Client client;
|
public final Client client;
|
||||||
|
|
||||||
private final AtomicInteger updatesInProgress = new AtomicInteger();
|
final AtomicInteger upgradesInProgress = new AtomicInteger();
|
||||||
|
|
||||||
private ImmutableOpenMap<String, IndexTemplateMetaData> lastTemplateMetaData;
|
private ImmutableOpenMap<String, IndexTemplateMetaData> lastTemplateMetaData;
|
||||||
|
|
||||||
|
@ -103,8 +101,8 @@ public class TemplateUpgradeService extends AbstractComponent implements Cluster
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (updatesInProgress.get() > 0) {
|
if (upgradesInProgress.get() > 0) {
|
||||||
// we are already running some updates - skip this cluster state update
|
// we are already running some upgrades - skip this cluster state update
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -124,7 +122,7 @@ public class TemplateUpgradeService extends AbstractComponent implements Cluster
|
||||||
lastTemplateMetaData = templates;
|
lastTemplateMetaData = templates;
|
||||||
Optional<Tuple<Map<String, BytesReference>, Set<String>>> changes = calculateTemplateChanges(templates);
|
Optional<Tuple<Map<String, BytesReference>, Set<String>>> changes = calculateTemplateChanges(templates);
|
||||||
if (changes.isPresent()) {
|
if (changes.isPresent()) {
|
||||||
if (updatesInProgress.compareAndSet(0, changes.get().v1().size() + changes.get().v2().size())) {
|
if (upgradesInProgress.compareAndSet(0, changes.get().v1().size() + changes.get().v2().size() + 1)) {
|
||||||
logger.info("Starting template upgrade to version {}, {} templates will be updated and {} will be removed",
|
logger.info("Starting template upgrade to version {}, {} templates will be updated and {} will be removed",
|
||||||
Version.CURRENT,
|
Version.CURRENT,
|
||||||
changes.get().v1().size(),
|
changes.get().v1().size(),
|
||||||
|
@ -133,13 +131,14 @@ public class TemplateUpgradeService extends AbstractComponent implements Cluster
|
||||||
final ThreadContext threadContext = threadPool.getThreadContext();
|
final ThreadContext threadContext = threadPool.getThreadContext();
|
||||||
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||||
threadContext.markAsSystemContext();
|
threadContext.markAsSystemContext();
|
||||||
threadPool.generic().execute(() -> updateTemplates(changes.get().v1(), changes.get().v2()));
|
threadPool.generic().execute(() -> upgradeTemplates(changes.get().v1(), changes.get().v2()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void updateTemplates(Map<String, BytesReference> changes, Set<String> deletions) {
|
void upgradeTemplates(Map<String, BytesReference> changes, Set<String> deletions) {
|
||||||
|
final AtomicBoolean anyUpgradeFailed = new AtomicBoolean(false);
|
||||||
if (threadPool.getThreadContext().isSystemContext() == false) {
|
if (threadPool.getThreadContext().isSystemContext() == false) {
|
||||||
throw new IllegalStateException("template updates from the template upgrade service should always happen in a system context");
|
throw new IllegalStateException("template updates from the template upgrade service should always happen in a system context");
|
||||||
}
|
}
|
||||||
|
@ -151,20 +150,18 @@ public class TemplateUpgradeService extends AbstractComponent implements Cluster
|
||||||
client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
|
client.admin().indices().putTemplate(request, new ActionListener<PutIndexTemplateResponse>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(PutIndexTemplateResponse response) {
|
public void onResponse(PutIndexTemplateResponse response) {
|
||||||
if (updatesInProgress.decrementAndGet() == 0) {
|
|
||||||
logger.info("Finished upgrading templates to version {}", Version.CURRENT);
|
|
||||||
}
|
|
||||||
if (response.isAcknowledged() == false) {
|
if (response.isAcknowledged() == false) {
|
||||||
|
anyUpgradeFailed.set(true);
|
||||||
logger.warn("Error updating template [{}], request was not acknowledged", change.getKey());
|
logger.warn("Error updating template [{}], request was not acknowledged", change.getKey());
|
||||||
}
|
}
|
||||||
|
tryFinishUpgrade(anyUpgradeFailed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
if (updatesInProgress.decrementAndGet() == 0) {
|
anyUpgradeFailed.set(true);
|
||||||
logger.info("Templates were upgraded to version {}", Version.CURRENT);
|
|
||||||
}
|
|
||||||
logger.warn(new ParameterizedMessage("Error updating template [{}]", change.getKey()), e);
|
logger.warn(new ParameterizedMessage("Error updating template [{}]", change.getKey()), e);
|
||||||
|
tryFinishUpgrade(anyUpgradeFailed);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
@ -175,27 +172,51 @@ public class TemplateUpgradeService extends AbstractComponent implements Cluster
|
||||||
client.admin().indices().deleteTemplate(request, new ActionListener<DeleteIndexTemplateResponse>() {
|
client.admin().indices().deleteTemplate(request, new ActionListener<DeleteIndexTemplateResponse>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(DeleteIndexTemplateResponse response) {
|
public void onResponse(DeleteIndexTemplateResponse response) {
|
||||||
updatesInProgress.decrementAndGet();
|
|
||||||
if (response.isAcknowledged() == false) {
|
if (response.isAcknowledged() == false) {
|
||||||
|
anyUpgradeFailed.set(true);
|
||||||
logger.warn("Error deleting template [{}], request was not acknowledged", template);
|
logger.warn("Error deleting template [{}], request was not acknowledged", template);
|
||||||
}
|
}
|
||||||
|
tryFinishUpgrade(anyUpgradeFailed);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
updatesInProgress.decrementAndGet();
|
anyUpgradeFailed.set(true);
|
||||||
if (e instanceof IndexTemplateMissingException == false) {
|
if (e instanceof IndexTemplateMissingException == false) {
|
||||||
// we might attempt to delete the same template from different nodes - so that's ok if template doesn't exist
|
// we might attempt to delete the same template from different nodes - so that's ok if template doesn't exist
|
||||||
// otherwise we need to warn
|
// otherwise we need to warn
|
||||||
logger.warn(new ParameterizedMessage("Error deleting template [{}]", template), e);
|
logger.warn(new ParameterizedMessage("Error deleting template [{}]", template), e);
|
||||||
}
|
}
|
||||||
|
tryFinishUpgrade(anyUpgradeFailed);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
int getUpdatesInProgress() {
|
void tryFinishUpgrade(AtomicBoolean anyUpgradeFailed) {
|
||||||
return updatesInProgress.get();
|
assert upgradesInProgress.get() > 0;
|
||||||
|
if (upgradesInProgress.decrementAndGet() == 1) {
|
||||||
|
try {
|
||||||
|
// this is the last upgrade, the templates should now be in the desired state
|
||||||
|
if (anyUpgradeFailed.get()) {
|
||||||
|
logger.info("Templates were partially upgraded to version {}", Version.CURRENT);
|
||||||
|
} else {
|
||||||
|
logger.info("Templates were upgraded successfuly to version {}", Version.CURRENT);
|
||||||
|
}
|
||||||
|
// Check upgraders are satisfied after the update completed. If they still
|
||||||
|
// report that changes are required, this might indicate a bug or that something
|
||||||
|
// else tinkering with the templates during the upgrade.
|
||||||
|
final ImmutableOpenMap<String, IndexTemplateMetaData> upgradedTemplates =
|
||||||
|
clusterService.state().getMetaData().getTemplates();
|
||||||
|
final boolean changesRequired = calculateTemplateChanges(upgradedTemplates).isPresent();
|
||||||
|
if (changesRequired) {
|
||||||
|
logger.warn("Templates are still reported as out of date after the upgrade. The template upgrade will be retried.");
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
final int noMoreUpgrades = upgradesInProgress.decrementAndGet();
|
||||||
|
assert noMoreUpgrades == 0;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
Optional<Tuple<Map<String, BytesReference>, Set<String>>> calculateTemplateChanges(
|
Optional<Tuple<Map<String, BytesReference>, Set<String>>> calculateTemplateChanges(
|
||||||
|
|
|
@ -35,12 +35,16 @@ import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
import org.elasticsearch.common.bytes.BytesArray;
|
import org.elasticsearch.common.bytes.BytesArray;
|
||||||
import org.elasticsearch.common.bytes.BytesReference;
|
import org.elasticsearch.common.bytes.BytesReference;
|
||||||
|
import org.elasticsearch.common.collect.ImmutableOpenMap;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.settings.ClusterSettings;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
import org.elasticsearch.common.util.concurrent.ThreadContext;
|
||||||
|
import org.elasticsearch.test.ClusterServiceUtils;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
|
import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
|
@ -52,13 +56,16 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Optional;
|
import java.util.Optional;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.Semaphore;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
import static java.util.Collections.emptyMap;
|
import static java.util.Collections.emptyMap;
|
||||||
|
import static org.elasticsearch.test.ClusterServiceUtils.createClusterService;
|
||||||
|
import static org.elasticsearch.test.ClusterServiceUtils.setState;
|
||||||
import static org.elasticsearch.test.VersionUtils.randomVersion;
|
import static org.elasticsearch.test.VersionUtils.randomVersion;
|
||||||
import static org.hamcrest.CoreMatchers.nullValue;
|
import static org.hamcrest.CoreMatchers.nullValue;
|
||||||
import static org.hamcrest.CoreMatchers.startsWith;
|
import static org.hamcrest.CoreMatchers.startsWith;
|
||||||
|
@ -75,8 +82,20 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
public class TemplateUpgradeServiceTests extends ESTestCase {
|
public class TemplateUpgradeServiceTests extends ESTestCase {
|
||||||
|
|
||||||
private final ClusterService clusterService = new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY,
|
private ThreadPool threadPool;
|
||||||
ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null, Collections.emptyMap());
|
private ClusterService clusterService;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUpTest() throws Exception {
|
||||||
|
threadPool = new TestThreadPool("TemplateUpgradeServiceTests");
|
||||||
|
clusterService = createClusterService(threadPool);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDownTest() throws Exception {
|
||||||
|
threadPool.shutdownNow();
|
||||||
|
clusterService.close();
|
||||||
|
}
|
||||||
|
|
||||||
public void testCalculateChangesAddChangeAndDelete() {
|
public void testCalculateChangesAddChangeAndDelete() {
|
||||||
|
|
||||||
|
@ -90,7 +109,7 @@ public class TemplateUpgradeServiceTests extends ESTestCase {
|
||||||
IndexTemplateMetaData.builder("changed_test_template").patterns(randomIndexPatterns()).build()
|
IndexTemplateMetaData.builder("changed_test_template").patterns(randomIndexPatterns()).build()
|
||||||
);
|
);
|
||||||
|
|
||||||
TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, null, clusterService, null,
|
final TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, null, clusterService, threadPool,
|
||||||
Arrays.asList(
|
Arrays.asList(
|
||||||
templates -> {
|
templates -> {
|
||||||
if (shouldAdd) {
|
if (shouldAdd) {
|
||||||
|
@ -190,18 +209,18 @@ public class TemplateUpgradeServiceTests extends ESTestCase {
|
||||||
additions.put("add_template_" + i, new BytesArray("{\"index_patterns\" : \"*\", \"order\" : " + i + "}"));
|
additions.put("add_template_" + i, new BytesArray("{\"index_patterns\" : \"*\", \"order\" : " + i + "}"));
|
||||||
}
|
}
|
||||||
|
|
||||||
ThreadPool threadPool = mock(ThreadPool.class);
|
final TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, mockClient, clusterService, threadPool,
|
||||||
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
|
||||||
when(threadPool.getThreadContext()).thenReturn(threadContext);
|
|
||||||
TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, mockClient, clusterService, threadPool,
|
|
||||||
Collections.emptyList());
|
Collections.emptyList());
|
||||||
|
|
||||||
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> service.updateTemplates(additions, deletions));
|
IllegalStateException ise = expectThrows(IllegalStateException.class, () -> service.upgradeTemplates(additions, deletions));
|
||||||
assertThat(ise.getMessage(), containsString("template upgrade service should always happen in a system context"));
|
assertThat(ise.getMessage(), containsString("template upgrade service should always happen in a system context"));
|
||||||
|
|
||||||
threadContext.markAsSystemContext();
|
service.upgradesInProgress.set(additionsCount + deletionsCount + 2); // +2 to skip tryFinishUpgrade
|
||||||
service.updateTemplates(additions, deletions);
|
final ThreadContext threadContext = threadPool.getThreadContext();
|
||||||
int updatesInProgress = service.getUpdatesInProgress();
|
try (ThreadContext.StoredContext ignore = threadContext.stashContext()) {
|
||||||
|
threadContext.markAsSystemContext();
|
||||||
|
service.upgradeTemplates(additions, deletions);
|
||||||
|
}
|
||||||
|
|
||||||
assertThat(putTemplateListeners, hasSize(additionsCount));
|
assertThat(putTemplateListeners, hasSize(additionsCount));
|
||||||
assertThat(deleteTemplateListeners, hasSize(deletionsCount));
|
assertThat(deleteTemplateListeners, hasSize(deletionsCount));
|
||||||
|
@ -218,30 +237,34 @@ public class TemplateUpgradeServiceTests extends ESTestCase {
|
||||||
|
|
||||||
for (int i = 0; i < deletionsCount; i++) {
|
for (int i = 0; i < deletionsCount; i++) {
|
||||||
if (randomBoolean()) {
|
if (randomBoolean()) {
|
||||||
int prevUpdatesInProgress = service.getUpdatesInProgress();
|
int prevUpdatesInProgress = service.upgradesInProgress.get();
|
||||||
deleteTemplateListeners.get(i).onFailure(new RuntimeException("test - ignore"));
|
deleteTemplateListeners.get(i).onFailure(new RuntimeException("test - ignore"));
|
||||||
assertThat(prevUpdatesInProgress - service.getUpdatesInProgress(), equalTo(1));
|
assertThat(prevUpdatesInProgress - service.upgradesInProgress.get(), equalTo(1));
|
||||||
} else {
|
} else {
|
||||||
int prevUpdatesInProgress = service.getUpdatesInProgress();
|
int prevUpdatesInProgress = service.upgradesInProgress.get();
|
||||||
deleteTemplateListeners.get(i).onResponse(new DeleteIndexTemplateResponse(randomBoolean()) {
|
deleteTemplateListeners.get(i).onResponse(new DeleteIndexTemplateResponse(randomBoolean()) {
|
||||||
|
|
||||||
});
|
});
|
||||||
assertThat(prevUpdatesInProgress - service.getUpdatesInProgress(), equalTo(1));
|
assertThat(prevUpdatesInProgress - service.upgradesInProgress.get(), equalTo(1));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
assertThat(updatesInProgress - service.getUpdatesInProgress(), equalTo(additionsCount + deletionsCount));
|
// tryFinishUpgrade was skipped
|
||||||
|
assertThat(service.upgradesInProgress.get(), equalTo(2));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final Set<DiscoveryNode.Role> MASTER_DATA_ROLES =
|
private static final Set<DiscoveryNode.Role> MASTER_DATA_ROLES =
|
||||||
Collections.unmodifiableSet(EnumSet.of(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA));
|
Collections.unmodifiableSet(EnumSet.of(DiscoveryNode.Role.MASTER, DiscoveryNode.Role.DATA));
|
||||||
|
|
||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
public void testClusterStateUpdate() {
|
public void testClusterStateUpdate() throws InterruptedException {
|
||||||
|
|
||||||
AtomicReference<ActionListener<PutIndexTemplateResponse>> addedListener = new AtomicReference<>();
|
final AtomicReference<ActionListener<PutIndexTemplateResponse>> addedListener = new AtomicReference<>();
|
||||||
AtomicReference<ActionListener<PutIndexTemplateResponse>> changedListener = new AtomicReference<>();
|
final AtomicReference<ActionListener<PutIndexTemplateResponse>> changedListener = new AtomicReference<>();
|
||||||
AtomicReference<ActionListener<DeleteIndexTemplateResponse>> removedListener = new AtomicReference<>();
|
final AtomicReference<ActionListener<DeleteIndexTemplateResponse>> removedListener = new AtomicReference<>();
|
||||||
AtomicInteger updateInvocation = new AtomicInteger();
|
final Semaphore updateInvocation = new Semaphore(0);
|
||||||
|
final Semaphore calculateInvocation = new Semaphore(0);
|
||||||
|
final Semaphore changedInvocation = new Semaphore(0);
|
||||||
|
final Semaphore finishInvocation = new Semaphore(0);
|
||||||
|
|
||||||
MetaData metaData = randomMetaData(
|
MetaData metaData = randomMetaData(
|
||||||
IndexTemplateMetaData.builder("user_template").patterns(randomIndexPatterns()).build(),
|
IndexTemplateMetaData.builder("user_template").patterns(randomIndexPatterns()).build(),
|
||||||
|
@ -249,21 +272,6 @@ public class TemplateUpgradeServiceTests extends ESTestCase {
|
||||||
IndexTemplateMetaData.builder("changed_test_template").patterns(randomIndexPatterns()).build()
|
IndexTemplateMetaData.builder("changed_test_template").patterns(randomIndexPatterns()).build()
|
||||||
);
|
);
|
||||||
|
|
||||||
ThreadPool threadPool = mock(ThreadPool.class);
|
|
||||||
ThreadContext threadContext = new ThreadContext(Settings.EMPTY);
|
|
||||||
when(threadPool.getThreadContext()).thenReturn(threadContext);
|
|
||||||
ExecutorService executorService = mock(ExecutorService.class);
|
|
||||||
when(threadPool.generic()).thenReturn(executorService);
|
|
||||||
doAnswer(invocation -> {
|
|
||||||
Object[] args = invocation.getArguments();
|
|
||||||
assert args.length == 1;
|
|
||||||
assertTrue(threadContext.isSystemContext());
|
|
||||||
Runnable runnable = (Runnable) args[0];
|
|
||||||
runnable.run();
|
|
||||||
updateInvocation.incrementAndGet();
|
|
||||||
return null;
|
|
||||||
}).when(executorService).execute(any(Runnable.class));
|
|
||||||
|
|
||||||
Client mockClient = mock(Client.class);
|
Client mockClient = mock(Client.class);
|
||||||
AdminClient mockAdminClient = mock(AdminClient.class);
|
AdminClient mockAdminClient = mock(AdminClient.class);
|
||||||
IndicesAdminClient mockIndicesAdminClient = mock(IndicesAdminClient.class);
|
IndicesAdminClient mockIndicesAdminClient = mock(IndicesAdminClient.class);
|
||||||
|
@ -293,7 +301,7 @@ public class TemplateUpgradeServiceTests extends ESTestCase {
|
||||||
return null;
|
return null;
|
||||||
}).when(mockIndicesAdminClient).deleteTemplate(any(DeleteIndexTemplateRequest.class), any(ActionListener.class));
|
}).when(mockIndicesAdminClient).deleteTemplate(any(DeleteIndexTemplateRequest.class), any(ActionListener.class));
|
||||||
|
|
||||||
TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, mockClient, clusterService, threadPool,
|
final TemplateUpgradeService service = new TemplateUpgradeService(Settings.EMPTY, mockClient, clusterService, threadPool,
|
||||||
Arrays.asList(
|
Arrays.asList(
|
||||||
templates -> {
|
templates -> {
|
||||||
assertNull(templates.put("added_test_template", IndexTemplateMetaData.builder("added_test_template")
|
assertNull(templates.put("added_test_template", IndexTemplateMetaData.builder("added_test_template")
|
||||||
|
@ -309,26 +317,63 @@ public class TemplateUpgradeServiceTests extends ESTestCase {
|
||||||
.patterns(Collections.singletonList("*")).order(10).build()));
|
.patterns(Collections.singletonList("*")).order(10).build()));
|
||||||
return templates;
|
return templates;
|
||||||
}
|
}
|
||||||
));
|
)) {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void tryFinishUpgrade(AtomicBoolean anyUpgradeFailed) {
|
||||||
|
super.tryFinishUpgrade(anyUpgradeFailed);
|
||||||
|
finishInvocation.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void upgradeTemplates(Map<String, BytesReference> changes, Set<String> deletions) {
|
||||||
|
super.upgradeTemplates(changes, deletions);
|
||||||
|
updateInvocation.release();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
Optional<Tuple<Map<String, BytesReference>, Set<String>>>
|
||||||
|
calculateTemplateChanges(ImmutableOpenMap<String, IndexTemplateMetaData> templates) {
|
||||||
|
final Optional<Tuple<Map<String, BytesReference>, Set<String>>> ans = super.calculateTemplateChanges(templates);
|
||||||
|
calculateInvocation.release();
|
||||||
|
return ans;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void clusterChanged(ClusterChangedEvent event) {
|
||||||
|
super.clusterChanged(event);
|
||||||
|
changedInvocation.release();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
ClusterState prevState = ClusterState.EMPTY_STATE;
|
ClusterState prevState = ClusterState.EMPTY_STATE;
|
||||||
ClusterState state = ClusterState.builder(prevState).nodes(DiscoveryNodes.builder()
|
ClusterState state = ClusterState.builder(prevState).nodes(DiscoveryNodes.builder()
|
||||||
.add(new DiscoveryNode("node1", "node1", buildNewFakeTransportAddress(), emptyMap(), MASTER_DATA_ROLES, Version.CURRENT)
|
.add(new DiscoveryNode("node1", "node1", buildNewFakeTransportAddress(), emptyMap(), MASTER_DATA_ROLES, Version.CURRENT)
|
||||||
).localNodeId("node1").masterNodeId("node1").build()
|
).localNodeId("node1").masterNodeId("node1").build()
|
||||||
).metaData(metaData).build();
|
).metaData(metaData).build();
|
||||||
service.clusterChanged(new ClusterChangedEvent("test", state, prevState));
|
setState(clusterService, state);
|
||||||
|
|
||||||
assertThat(updateInvocation.get(), equalTo(1));
|
changedInvocation.acquire();
|
||||||
|
assertThat(changedInvocation.availablePermits(), equalTo(0));
|
||||||
|
calculateInvocation.acquire();
|
||||||
|
assertThat(calculateInvocation.availablePermits(), equalTo(0));
|
||||||
|
updateInvocation.acquire();
|
||||||
|
assertThat(updateInvocation.availablePermits(), equalTo(0));
|
||||||
|
assertThat(finishInvocation.availablePermits(), equalTo(0));
|
||||||
assertThat(addedListener.get(), notNullValue());
|
assertThat(addedListener.get(), notNullValue());
|
||||||
assertThat(changedListener.get(), notNullValue());
|
assertThat(changedListener.get(), notNullValue());
|
||||||
assertThat(removedListener.get(), notNullValue());
|
assertThat(removedListener.get(), notNullValue());
|
||||||
|
|
||||||
prevState = state;
|
prevState = state;
|
||||||
state = ClusterState.builder(prevState).metaData(MetaData.builder(state.metaData()).removeTemplate("user_template")).build();
|
state = ClusterState.builder(prevState).metaData(MetaData.builder(state.metaData()).removeTemplate("user_template")).build();
|
||||||
service.clusterChanged(new ClusterChangedEvent("test 2", state, prevState));
|
setState(clusterService, state);
|
||||||
|
|
||||||
// Make sure that update wasn't invoked since we are still running
|
// Make sure that update wasn't invoked since we are still running
|
||||||
assertThat(updateInvocation.get(), equalTo(1));
|
changedInvocation.acquire();
|
||||||
|
assertThat(changedInvocation.availablePermits(), equalTo(0));
|
||||||
|
assertThat(calculateInvocation.availablePermits(), equalTo(0));
|
||||||
|
assertThat(updateInvocation.availablePermits(), equalTo(0));
|
||||||
|
assertThat(finishInvocation.availablePermits(), equalTo(0));
|
||||||
|
|
||||||
addedListener.getAndSet(null).onResponse(new PutIndexTemplateResponse(true) {
|
addedListener.getAndSet(null).onResponse(new PutIndexTemplateResponse(true) {
|
||||||
});
|
});
|
||||||
|
@ -337,19 +382,40 @@ public class TemplateUpgradeServiceTests extends ESTestCase {
|
||||||
removedListener.getAndSet(null).onResponse(new DeleteIndexTemplateResponse(true) {
|
removedListener.getAndSet(null).onResponse(new DeleteIndexTemplateResponse(true) {
|
||||||
});
|
});
|
||||||
|
|
||||||
service.clusterChanged(new ClusterChangedEvent("test 3", state, prevState));
|
// 3 upgrades should be completed, in addition to the final calculate
|
||||||
|
finishInvocation.acquire(3);
|
||||||
|
assertThat(finishInvocation.availablePermits(), equalTo(0));
|
||||||
|
calculateInvocation.acquire();
|
||||||
|
assertThat(calculateInvocation.availablePermits(), equalTo(0));
|
||||||
|
|
||||||
|
setState(clusterService, state);
|
||||||
|
|
||||||
// Make sure that update was called this time since we are no longer running
|
// Make sure that update was called this time since we are no longer running
|
||||||
assertThat(updateInvocation.get(), equalTo(2));
|
changedInvocation.acquire();
|
||||||
|
assertThat(changedInvocation.availablePermits(), equalTo(0));
|
||||||
|
calculateInvocation.acquire();
|
||||||
|
assertThat(calculateInvocation.availablePermits(), equalTo(0));
|
||||||
|
updateInvocation.acquire();
|
||||||
|
assertThat(updateInvocation.availablePermits(), equalTo(0));
|
||||||
|
assertThat(finishInvocation.availablePermits(), equalTo(0));
|
||||||
|
|
||||||
addedListener.getAndSet(null).onFailure(new RuntimeException("test - ignore"));
|
addedListener.getAndSet(null).onFailure(new RuntimeException("test - ignore"));
|
||||||
changedListener.getAndSet(null).onFailure(new RuntimeException("test - ignore"));
|
changedListener.getAndSet(null).onFailure(new RuntimeException("test - ignore"));
|
||||||
removedListener.getAndSet(null).onFailure(new RuntimeException("test - ignore"));
|
removedListener.getAndSet(null).onFailure(new RuntimeException("test - ignore"));
|
||||||
|
|
||||||
service.clusterChanged(new ClusterChangedEvent("test 3", state, prevState));
|
finishInvocation.acquire(3);
|
||||||
|
assertThat(finishInvocation.availablePermits(), equalTo(0));
|
||||||
|
calculateInvocation.acquire();
|
||||||
|
assertThat(calculateInvocation.availablePermits(), equalTo(0));
|
||||||
|
|
||||||
|
setState(clusterService, state);
|
||||||
|
|
||||||
// Make sure that update wasn't called this time since the index template metadata didn't change
|
// Make sure that update wasn't called this time since the index template metadata didn't change
|
||||||
assertThat(updateInvocation.get(), equalTo(2));
|
changedInvocation.acquire();
|
||||||
|
assertThat(changedInvocation.availablePermits(), equalTo(0));
|
||||||
|
assertThat(calculateInvocation.availablePermits(), equalTo(0));
|
||||||
|
assertThat(updateInvocation.availablePermits(), equalTo(0));
|
||||||
|
assertThat(finishInvocation.availablePermits(), equalTo(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final int NODE_TEST_ITERS = 100;
|
private static final int NODE_TEST_ITERS = 100;
|
||||||
|
|
|
@ -992,24 +992,22 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Settings customAuditIndexSettings(Settings nodeSettings, Logger logger) {
|
public static Settings customAuditIndexSettings(Settings nodeSettings, Logger logger) {
|
||||||
Settings newSettings = Settings.builder()
|
final Settings newSettings = Settings.builder()
|
||||||
.put(INDEX_SETTINGS.get(nodeSettings), false)
|
.put(INDEX_SETTINGS.get(nodeSettings), false)
|
||||||
|
.normalizePrefix(IndexMetaData.INDEX_SETTING_PREFIX)
|
||||||
.build();
|
.build();
|
||||||
if (newSettings.names().isEmpty()) {
|
if (newSettings.names().isEmpty()) {
|
||||||
return Settings.EMPTY;
|
return Settings.EMPTY;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Filter out forbidden settings:
|
// Filter out forbidden setting
|
||||||
Settings.Builder builder = Settings.builder();
|
return Settings.builder().put(newSettings.filter(name -> {
|
||||||
builder.put(newSettings.filter(k -> {
|
|
||||||
String name = "index." + k;
|
|
||||||
if (FORBIDDEN_INDEX_SETTING.equals(name)) {
|
if (FORBIDDEN_INDEX_SETTING.equals(name)) {
|
||||||
logger.warn("overriding the default [{}} setting is forbidden. ignoring...", name);
|
logger.warn("overriding the default [{}} setting is forbidden. ignoring...", name);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}));
|
})).build();
|
||||||
return builder.build();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private void putTemplate(Settings customSettings, Consumer<Exception> consumer) {
|
private void putTemplate(Settings customSettings, Consumer<Exception> consumer) {
|
||||||
|
|
|
@ -6,10 +6,14 @@
|
||||||
package org.elasticsearch.xpack.security.audit.index;
|
package org.elasticsearch.xpack.security.audit.index;
|
||||||
|
|
||||||
import org.apache.lucene.util.SetOnce;
|
import org.apache.lucene.util.SetOnce;
|
||||||
|
import org.elasticsearch.action.ActionFuture;
|
||||||
import org.elasticsearch.action.IndicesRequest;
|
import org.elasticsearch.action.IndicesRequest;
|
||||||
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
import org.elasticsearch.action.admin.cluster.health.ClusterHealthResponse;
|
||||||
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
import org.elasticsearch.action.admin.cluster.node.info.NodesInfoResponse;
|
||||||
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
import org.elasticsearch.action.admin.indices.settings.get.GetSettingsResponse;
|
||||||
|
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesRequest;
|
||||||
|
import org.elasticsearch.action.admin.indices.template.get.GetIndexTemplatesResponse;
|
||||||
|
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
|
||||||
import org.elasticsearch.action.search.SearchResponse;
|
import org.elasticsearch.action.search.SearchResponse;
|
||||||
import org.elasticsearch.action.support.IndicesOptions;
|
import org.elasticsearch.action.support.IndicesOptions;
|
||||||
import org.elasticsearch.client.Client;
|
import org.elasticsearch.client.Client;
|
||||||
|
@ -17,6 +21,8 @@ import org.elasticsearch.client.Requests;
|
||||||
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
import org.elasticsearch.client.transport.NoNodeAvailableException;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
import org.elasticsearch.cluster.health.ClusterHealthStatus;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
|
import org.elasticsearch.cluster.metadata.IndexTemplateMetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
import org.elasticsearch.cluster.node.DiscoveryNodes;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
|
@ -29,6 +35,7 @@ import org.elasticsearch.common.settings.Settings;
|
||||||
import org.elasticsearch.common.transport.TransportAddress;
|
import org.elasticsearch.common.transport.TransportAddress;
|
||||||
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
import org.elasticsearch.common.util.concurrent.EsExecutors;
|
||||||
import org.elasticsearch.common.util.set.Sets;
|
import org.elasticsearch.common.util.set.Sets;
|
||||||
|
import org.elasticsearch.plugins.MetaDataUpgrader;
|
||||||
import org.elasticsearch.plugins.Plugin;
|
import org.elasticsearch.plugins.Plugin;
|
||||||
import org.elasticsearch.rest.RestRequest;
|
import org.elasticsearch.rest.RestRequest;
|
||||||
import org.elasticsearch.search.SearchHit;
|
import org.elasticsearch.search.SearchHit;
|
||||||
|
@ -70,7 +77,9 @@ import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.function.Function;
|
import java.util.function.Function;
|
||||||
|
import static java.util.Collections.emptyMap;
|
||||||
|
|
||||||
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
|
import static org.elasticsearch.test.ESIntegTestCase.Scope.SUITE;
|
||||||
import static org.elasticsearch.test.InternalTestCluster.clusterName;
|
import static org.elasticsearch.test.InternalTestCluster.clusterName;
|
||||||
|
@ -85,6 +94,7 @@ import static org.hamcrest.Matchers.hasToString;
|
||||||
import static org.hamcrest.Matchers.is;
|
import static org.hamcrest.Matchers.is;
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
import static org.hamcrest.Matchers.notNullValue;
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
import static org.hamcrest.Matchers.nullValue;
|
||||||
|
import static org.hamcrest.Matchers.hasSize;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ -360,6 +370,21 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase {
|
||||||
auditor.start();
|
auditor.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testIndexTemplateUpgrader() throws Exception {
|
||||||
|
final MetaDataUpgrader metaDataUpgrader = internalCluster().getInstance(MetaDataUpgrader.class);
|
||||||
|
final Map<String, IndexTemplateMetaData> updatedTemplates = metaDataUpgrader.indexTemplateMetaDataUpgraders.apply(emptyMap());
|
||||||
|
final IndexTemplateMetaData indexAuditTrailTemplate = updatedTemplates.get(IndexAuditTrail.INDEX_TEMPLATE_NAME);
|
||||||
|
assertThat(indexAuditTrailTemplate, notNullValue());
|
||||||
|
// test custom index settings override template
|
||||||
|
assertThat(IndexMetaData.INDEX_NUMBER_OF_REPLICAS_SETTING.get(indexAuditTrailTemplate.settings()), is(numReplicas));
|
||||||
|
assertThat(IndexMetaData.INDEX_NUMBER_OF_SHARDS_SETTING.get(indexAuditTrailTemplate.settings()), is(numShards));
|
||||||
|
// test upgrade template and installed template are equal
|
||||||
|
final GetIndexTemplatesRequest request = new GetIndexTemplatesRequest(IndexAuditTrail.INDEX_TEMPLATE_NAME);
|
||||||
|
final GetIndexTemplatesResponse response = client().admin().indices().getTemplates(request).get();
|
||||||
|
assertThat(response.getIndexTemplates(), hasSize(1));
|
||||||
|
assertThat(indexAuditTrailTemplate, is(response.getIndexTemplates().get(0)));
|
||||||
|
}
|
||||||
|
|
||||||
public void testProcessorsSetting() {
|
public void testProcessorsSetting() {
|
||||||
final boolean explicitProcessors = randomBoolean();
|
final boolean explicitProcessors = randomBoolean();
|
||||||
final int processors;
|
final int processors;
|
||||||
|
|
Loading…
Reference in New Issue