[Transform] Fix possible audit logging disappearance after rolling upgrade (#49731) (#49767)

ensure audit index template is available during a rolling upgrade before a
transform task can write to it.

fixes #49730
This commit is contained in:
Hendrik Muhs 2019-12-03 18:05:06 +01:00 committed by GitHub
parent a3f88595d7
commit 7aae212287
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 177 additions and 13 deletions

View File

@ -32,7 +32,8 @@ public final class TransformInternalIndexConstants {
public static final String INDEX_NAME_PATTERN_DEPRECATED = ".data-frame-internal-*"; public static final String INDEX_NAME_PATTERN_DEPRECATED = ".data-frame-internal-*";
// audit index // audit index
public static final String AUDIT_TEMPLATE_VERSION = "000001"; // gh #49730: upped version of audit index to 000002
public static final String AUDIT_TEMPLATE_VERSION = "000002";
public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-"; public static final String AUDIT_INDEX_PREFIX = ".transform-notifications-";
public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*"; public static final String AUDIT_INDEX_PATTERN = AUDIT_INDEX_PREFIX + "*";
public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1"; public static final String AUDIT_INDEX_DEPRECATED = ".data-frame-notifications-1";

View File

@ -317,21 +317,35 @@ public final class TransformInternalIndex {
return builder.startObject("_meta").field("version", Version.CURRENT).endObject(); return builder.startObject("_meta").field("version", Version.CURRENT).endObject();
} }
public static boolean haveLatestVersionedIndexTemplate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME);
}
/** /**
* This method should be called before any document is indexed that relies on the * This method should be called before any document is indexed that relies on the
* existence of the latest index template to create the internal index. The * existence of the latest index templates to create the internal and audit index.
* reason is that the standard template upgrader only runs when the master node * The reason is that the standard template upgrader only runs when the master node
* is upgraded to the newer version. If data nodes are upgraded before master * is upgraded to the newer version. If data nodes are upgraded before master
* nodes and transforms get assigned to those data nodes then without this check * nodes and transforms get assigned to those data nodes then without this check
* the data nodes will index documents into the internal index before the necessary * the data nodes will index documents into the internal index before the necessary
* index template is present and this will result in an index with completely * index template is present and this will result in an index with completely
* dynamic mappings being created (which is very bad). * dynamic mappings being created (which is very bad).
*/ */
public static void installLatestVersionedIndexTemplateIfRequired( public static void installLatestIndexTemplatesIfRequired(ClusterService clusterService, Client client, ActionListener<Void> listener) {
installLatestVersionedIndexTemplateIfRequired(
clusterService,
client,
ActionListener.wrap(r -> { installLatestAuditIndexTemplateIfRequired(clusterService, client, listener); }, listener::onFailure)
);
}
protected static boolean haveLatestVersionedIndexTemplate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME);
}
protected static boolean haveLatestAuditIndexTemplate(ClusterState state) {
return state.getMetaData().getTemplates().containsKey(TransformInternalIndexConstants.AUDIT_INDEX);
}
protected static void installLatestVersionedIndexTemplateIfRequired(
ClusterService clusterService, ClusterService clusterService,
Client client, Client client,
ActionListener<Void> listener ActionListener<Void> listener
@ -367,5 +381,40 @@ public final class TransformInternalIndex {
} }
} }
protected static void installLatestAuditIndexTemplateIfRequired(
ClusterService clusterService,
Client client,
ActionListener<Void> listener
) {
// The check for existence of the template is against local cluster state, so very cheap
if (haveLatestAuditIndexTemplate(clusterService.state())) {
listener.onResponse(null);
return;
}
// Installing the template involves communication with the master node, so it's more expensive but much rarer
try {
IndexTemplateMetaData indexTemplateMetaData = getAuditIndexTemplateMetaData();
BytesReference jsonMappings = new BytesArray(indexTemplateMetaData.mappings().get(SINGLE_MAPPING_NAME).uncompressed());
PutIndexTemplateRequest request = new PutIndexTemplateRequest(TransformInternalIndexConstants.AUDIT_INDEX).patterns(
indexTemplateMetaData.patterns()
)
.version(indexTemplateMetaData.version())
.settings(indexTemplateMetaData.settings())
.mapping(SINGLE_MAPPING_NAME, XContentHelper.convertToMap(jsonMappings, true, XContentType.JSON).v2());
ActionListener<AcknowledgedResponse> innerListener = ActionListener.wrap(r -> listener.onResponse(null), listener::onFailure);
executeAsyncWithOrigin(
client.threadPool().getThreadContext(),
TRANSFORM_ORIGIN,
request,
innerListener,
client.admin().indices()::putTemplate
);
} catch (IOException e) {
listener.onFailure(e);
}
}
private TransformInternalIndex() {} private TransformInternalIndex() {}
} }

View File

@ -283,8 +283,8 @@ public class TransformPersistentTasksExecutor extends PersistentTasksExecutor<Tr
} }
); );
// <1> Check the internal index template is installed // <1> Check the index templates are installed
TransformInternalIndex.installLatestVersionedIndexTemplateIfRequired(clusterService, client, templateCheckListener); TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, templateCheckListener);
} }
private static IndexerState currentIndexerState(TransformState previousState) { private static IndexerState currentIndexerState(TransformState previousState) {

View File

@ -38,6 +38,7 @@ import static org.mockito.Mockito.when;
public class TransformInternalIndexTests extends ESTestCase { public class TransformInternalIndexTests extends ESTestCase {
public static ClusterState STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE; public static ClusterState STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE;
public static ClusterState STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE;
static { static {
ImmutableOpenMap.Builder<String, IndexTemplateMetaData> mapBuilder = ImmutableOpenMap.builder(); ImmutableOpenMap.Builder<String, IndexTemplateMetaData> mapBuilder = ImmutableOpenMap.builder();
@ -51,6 +52,18 @@ public class TransformInternalIndexTests extends ESTestCase {
ClusterState.Builder csBuilder = ClusterState.builder(ClusterName.DEFAULT); ClusterState.Builder csBuilder = ClusterState.builder(ClusterName.DEFAULT);
csBuilder.metaData(metaBuilder.build()); csBuilder.metaData(metaBuilder.build());
STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE = csBuilder.build(); STATE_WITH_LATEST_VERSIONED_INDEX_TEMPLATE = csBuilder.build();
mapBuilder = ImmutableOpenMap.builder();
try {
mapBuilder.put(TransformInternalIndexConstants.AUDIT_INDEX, TransformInternalIndex.getAuditIndexTemplateMetaData());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
metaBuilder = MetaData.builder();
metaBuilder.templates(mapBuilder.build());
csBuilder = ClusterState.builder(ClusterName.DEFAULT);
csBuilder.metaData(metaBuilder.build());
STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE = csBuilder.build();
} }
public void testHaveLatestVersionedIndexTemplate() { public void testHaveLatestVersionedIndexTemplate() {
@ -81,8 +94,7 @@ public class TransformInternalIndexTests extends ESTestCase {
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE); when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
IndicesAdminClient indicesClient = mock(IndicesAdminClient.class); IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
doAnswer( doAnswer(invocationOnMock -> {
invocationOnMock -> {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1]; ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new AcknowledgedResponse(true)); listener.onResponse(new AcknowledgedResponse(true));
@ -112,4 +124,100 @@ public class TransformInternalIndexTests extends ESTestCase {
verify(indicesClient, times(1)).putTemplate(any(), any()); verify(indicesClient, times(1)).putTemplate(any(), any());
verifyNoMoreInteractions(indicesClient); verifyNoMoreInteractions(indicesClient);
} }
public void testHaveLatestAuditIndexTemplate() {
assertTrue(TransformInternalIndex.haveLatestAuditIndexTemplate(STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE));
assertFalse(TransformInternalIndex.haveLatestAuditIndexTemplate(ClusterState.EMPTY_STATE));
}
public void testInstallLatestAuditIndexTemplateIfRequired_GivenNotRequired() {
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(TransformInternalIndexTests.STATE_WITH_LATEST_AUDIT_INDEX_TEMPLATE);
Client client = mock(Client.class);
AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));
TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener);
assertTrue(gotResponse.get());
verifyNoMoreInteractions(client);
}
public void testInstallLatestAuditIndexTemplateIfRequired_GivenRequired() {
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new AcknowledgedResponse(true));
return null;
}).when(indicesClient).putTemplate(any(), any());
AdminClient adminClient = mock(AdminClient.class);
when(adminClient.indices()).thenReturn(indicesClient);
Client client = mock(Client.class);
when(client.admin()).thenReturn(adminClient);
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(client.threadPool()).thenReturn(threadPool);
AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));
TransformInternalIndex.installLatestAuditIndexTemplateIfRequired(clusterService, client, testListener);
assertTrue(gotResponse.get());
verify(client, times(1)).threadPool();
verify(client, times(1)).admin();
verifyNoMoreInteractions(client);
verify(adminClient, times(1)).indices();
verifyNoMoreInteractions(adminClient);
verify(indicesClient, times(1)).putTemplate(any(), any());
verifyNoMoreInteractions(indicesClient);
}
public void testInstallLatestIndexTemplateIfRequired_GivenRequired() {
ClusterService clusterService = mock(ClusterService.class);
when(clusterService.state()).thenReturn(ClusterState.EMPTY_STATE);
IndicesAdminClient indicesClient = mock(IndicesAdminClient.class);
doAnswer(invocationOnMock -> {
@SuppressWarnings("unchecked")
ActionListener<AcknowledgedResponse> listener = (ActionListener<AcknowledgedResponse>) invocationOnMock.getArguments()[1];
listener.onResponse(new AcknowledgedResponse(true));
return null;
}).when(indicesClient).putTemplate(any(), any());
AdminClient adminClient = mock(AdminClient.class);
when(adminClient.indices()).thenReturn(indicesClient);
Client client = mock(Client.class);
when(client.admin()).thenReturn(adminClient);
ThreadPool threadPool = mock(ThreadPool.class);
when(threadPool.getThreadContext()).thenReturn(new ThreadContext(Settings.EMPTY));
when(client.threadPool()).thenReturn(threadPool);
AtomicBoolean gotResponse = new AtomicBoolean(false);
ActionListener<Void> testListener = ActionListener.wrap(aVoid -> gotResponse.set(true), e -> fail(e.getMessage()));
TransformInternalIndex.installLatestIndexTemplatesIfRequired(clusterService, client, testListener);
assertTrue(gotResponse.get());
verify(client, times(2)).threadPool();
verify(client, times(2)).admin();
verifyNoMoreInteractions(client);
verify(adminClient, times(2)).indices();
verifyNoMoreInteractions(adminClient);
verify(indicesClient, times(2)).putTemplate(any(), any());
verifyNoMoreInteractions(indicesClient);
}
} }

View File

@ -258,7 +258,7 @@ setup:
transform.delete_transform: transform.delete_transform:
transform_id: "mixed-simple-continuous-transform" transform_id: "mixed-simple-continuous-transform"
--- ---
"Test index mappings for latest internal index": "Test index mappings for latest internal index and audit index":
- do: - do:
transform.put_transform: transform.put_transform:
transform_id: "upgraded-simple-transform" transform_id: "upgraded-simple-transform"
@ -279,3 +279,9 @@ setup:
index: .transform-internal-004 index: .transform-internal-004
- match: { \.transform-internal-004.mappings.dynamic: "false" } - match: { \.transform-internal-004.mappings.dynamic: "false" }
- match: { \.transform-internal-004.mappings.properties.id.type: "keyword" } - match: { \.transform-internal-004.mappings.properties.id.type: "keyword" }
- do:
indices.get_mapping:
index: .transform-notifications-000002
- match: { \.transform-notifications-000002.mappings.dynamic: "false" }
- match: { \.transform-notifications-000002.mappings.properties.transform_id.type: "keyword" }
- match: { \.transform-notifications-000002.mappings.properties.timestamp.type: "date" }