Retry startup for IndexAuditTrail and version templates (elastic/x-pack-elasticsearch#2755)
This commit removes the FAILED state for the IndexAuditTrail so that we always try to keep starting the service. Previously, on any exception during startup we moved to a failed state and never tried to start again. The users only option was to restart the node. This was problematic in the case of large clusters as there could be common timeouts of cluster state listeners that would cause the startup of this service to fail. Additionally, the logic in the IndexAuditTrail to update the template on the current cluster has been removed and replaced with the use of the TemplateUpgradeService. However, we still need to maintain the ability to determine if a template on a remote cluster should be PUT. To avoid always PUTing the template, the version field has been added so it only needs to be PUT once on upgrade. Finally, the default queue size has been increased as this is another common issue that users hit with high traffic clusters. relates elastic/x-pack-elasticsearch#2658 Original commit: elastic/x-pack-elasticsearch@27e2ce7223
This commit is contained in:
parent
f69f6cd341
commit
4f65d9b527
|
@ -898,14 +898,28 @@ public class Security implements ActionPlugin, IngestPlugin, NetworkPlugin, Clus
|
||||||
public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDataUpgrader() {
|
public UnaryOperator<Map<String, IndexTemplateMetaData>> getIndexTemplateMetaDataUpgrader() {
|
||||||
return templates -> {
|
return templates -> {
|
||||||
final byte[] securityTemplate = TemplateUtils.loadTemplate("/" + SECURITY_TEMPLATE_NAME + ".json",
|
final byte[] securityTemplate = TemplateUtils.loadTemplate("/" + SECURITY_TEMPLATE_NAME + ".json",
|
||||||
Version.CURRENT.toString(), IndexLifecycleManager.TEMPLATE_VERSION_PATTERN).getBytes(StandardCharsets.UTF_8);
|
Version.CURRENT.toString(), IndexLifecycleManager.TEMPLATE_VERSION_PATTERN).getBytes(StandardCharsets.UTF_8);
|
||||||
final XContent xContent = XContentFactory.xContent(XContentType.JSON);
|
final XContent xContent = XContentFactory.xContent(XContentType.JSON);
|
||||||
|
|
||||||
try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, securityTemplate)) {
|
try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, securityTemplate)) {
|
||||||
templates.put(SECURITY_TEMPLATE_NAME, IndexTemplateMetaData.Builder.fromXContent(parser, SECURITY_TEMPLATE_NAME));
|
templates.put(SECURITY_TEMPLATE_NAME, IndexTemplateMetaData.Builder.fromXContent(parser, SECURITY_TEMPLATE_NAME));
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// TODO: should we handle this with a thrown exception?
|
// TODO: should we handle this with a thrown exception?
|
||||||
logger.error("Error loading security template [{}] as part of metadata upgrading", SECURITY_TEMPLATE_NAME);
|
logger.error("Error loading template [{}] as part of metadata upgrading", SECURITY_TEMPLATE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
final byte[] auditTemplate = TemplateUtils.loadTemplate("/" + IndexAuditTrail.INDEX_TEMPLATE_NAME + ".json",
|
||||||
|
Version.CURRENT.toString(), IndexLifecycleManager.TEMPLATE_VERSION_PATTERN).getBytes(StandardCharsets.UTF_8);
|
||||||
|
|
||||||
|
try (XContentParser parser = xContent.createParser(NamedXContentRegistry.EMPTY, auditTemplate)) {
|
||||||
|
IndexTemplateMetaData auditMetadata = new IndexTemplateMetaData.Builder(
|
||||||
|
IndexTemplateMetaData.Builder.fromXContent(parser, IndexAuditTrail.INDEX_TEMPLATE_NAME))
|
||||||
|
.settings(IndexAuditTrail.customAuditIndexSettings(settings, logger))
|
||||||
|
.build();
|
||||||
|
templates.put(IndexAuditTrail.INDEX_TEMPLATE_NAME, auditMetadata);
|
||||||
|
} catch (IOException e) {
|
||||||
|
// TODO: should we handle this with a thrown exception?
|
||||||
|
logger.error("Error loading template [{}] as part of metadata upgrading", IndexAuditTrail.INDEX_TEMPLATE_NAME);
|
||||||
}
|
}
|
||||||
|
|
||||||
return templates;
|
return templates;
|
||||||
|
|
|
@ -88,7 +88,7 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust
|
||||||
try {
|
try {
|
||||||
if (Security.indexAuditLoggingEnabled(settings) &&
|
if (Security.indexAuditLoggingEnabled(settings) &&
|
||||||
indexAuditTrail.state() == IndexAuditTrail.State.INITIALIZED) {
|
indexAuditTrail.state() == IndexAuditTrail.State.INITIALIZED) {
|
||||||
if (indexAuditTrail.canStart(event, event.localNodeMaster())) {
|
if (indexAuditTrail.canStart(event)) {
|
||||||
threadPool.generic().execute(new AbstractRunnable() {
|
threadPool.generic().execute(new AbstractRunnable() {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -99,7 +99,7 @@ public class SecurityLifecycleService extends AbstractComponent implements Clust
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void doRun() {
|
public void doRun() {
|
||||||
indexAuditTrail.start(event.localNodeMaster());
|
indexAuditTrail.start();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
|
@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger;
|
||||||
import org.apache.logging.log4j.message.ParameterizedMessage;
|
import org.apache.logging.log4j.message.ParameterizedMessage;
|
||||||
import org.apache.logging.log4j.util.Supplier;
|
import org.apache.logging.log4j.util.Supplier;
|
||||||
import org.elasticsearch.ElasticsearchException;
|
import org.elasticsearch.ElasticsearchException;
|
||||||
|
import org.elasticsearch.Version;
|
||||||
import org.elasticsearch.action.ActionListener;
|
import org.elasticsearch.action.ActionListener;
|
||||||
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
import org.elasticsearch.action.admin.cluster.state.ClusterStateResponse;
|
||||||
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
|
import org.elasticsearch.action.admin.indices.template.put.PutIndexTemplateRequest;
|
||||||
|
@ -20,7 +21,6 @@ import org.elasticsearch.client.Client;
|
||||||
import org.elasticsearch.client.transport.TransportClient;
|
import org.elasticsearch.client.transport.TransportClient;
|
||||||
import org.elasticsearch.cluster.ClusterChangedEvent;
|
import org.elasticsearch.cluster.ClusterChangedEvent;
|
||||||
import org.elasticsearch.cluster.ClusterState;
|
import org.elasticsearch.cluster.ClusterState;
|
||||||
import org.elasticsearch.cluster.ClusterStateListener;
|
|
||||||
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
import org.elasticsearch.cluster.metadata.IndexMetaData;
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
import org.elasticsearch.cluster.node.DiscoveryNode;
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
import org.elasticsearch.cluster.service.ClusterService;
|
||||||
|
@ -28,7 +28,6 @@ import org.elasticsearch.common.Nullable;
|
||||||
import org.elasticsearch.common.Strings;
|
import org.elasticsearch.common.Strings;
|
||||||
import org.elasticsearch.common.collect.Tuple;
|
import org.elasticsearch.common.collect.Tuple;
|
||||||
import org.elasticsearch.common.component.AbstractComponent;
|
import org.elasticsearch.common.component.AbstractComponent;
|
||||||
import org.elasticsearch.common.io.Streams;
|
|
||||||
import org.elasticsearch.common.network.NetworkAddress;
|
import org.elasticsearch.common.network.NetworkAddress;
|
||||||
import org.elasticsearch.common.settings.Setting;
|
import org.elasticsearch.common.settings.Setting;
|
||||||
import org.elasticsearch.common.settings.Setting.Property;
|
import org.elasticsearch.common.settings.Setting.Property;
|
||||||
|
@ -53,21 +52,22 @@ import org.elasticsearch.xpack.security.audit.AuditTrail;
|
||||||
import org.elasticsearch.xpack.security.authc.AuthenticationToken;
|
import org.elasticsearch.xpack.security.authc.AuthenticationToken;
|
||||||
import org.elasticsearch.xpack.security.authz.privilege.SystemPrivilege;
|
import org.elasticsearch.xpack.security.authz.privilege.SystemPrivilege;
|
||||||
import org.elasticsearch.xpack.security.rest.RemoteHostHeader;
|
import org.elasticsearch.xpack.security.rest.RemoteHostHeader;
|
||||||
|
import org.elasticsearch.xpack.security.support.IndexLifecycleManager;
|
||||||
import org.elasticsearch.xpack.security.transport.filter.SecurityIpFilterRule;
|
import org.elasticsearch.xpack.security.transport.filter.SecurityIpFilterRule;
|
||||||
import org.elasticsearch.xpack.security.user.SystemUser;
|
import org.elasticsearch.xpack.security.user.SystemUser;
|
||||||
import org.elasticsearch.xpack.security.user.User;
|
import org.elasticsearch.xpack.security.user.User;
|
||||||
import org.elasticsearch.xpack.security.user.XPackUser;
|
import org.elasticsearch.xpack.security.user.XPackUser;
|
||||||
|
import org.elasticsearch.xpack.template.TemplateUtils;
|
||||||
import org.joda.time.DateTime;
|
import org.joda.time.DateTime;
|
||||||
import org.joda.time.DateTimeZone;
|
import org.joda.time.DateTimeZone;
|
||||||
|
|
||||||
import java.io.ByteArrayOutputStream;
|
|
||||||
import java.io.Closeable;
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InputStream;
|
|
||||||
import java.net.InetAddress;
|
import java.net.InetAddress;
|
||||||
import java.net.InetSocketAddress;
|
import java.net.InetSocketAddress;
|
||||||
import java.net.SocketAddress;
|
import java.net.SocketAddress;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
|
import java.nio.charset.StandardCharsets;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -103,7 +103,7 @@ import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.res
|
||||||
/**
|
/**
|
||||||
* Audit trail implementation that writes events into an index.
|
* Audit trail implementation that writes events into an index.
|
||||||
*/
|
*/
|
||||||
public class IndexAuditTrail extends AbstractComponent implements AuditTrail, ClusterStateListener {
|
public class IndexAuditTrail extends AbstractComponent implements AuditTrail {
|
||||||
|
|
||||||
public static final String NAME = "index";
|
public static final String NAME = "index";
|
||||||
public static final String INDEX_NAME_PREFIX = ".security_audit_log";
|
public static final String INDEX_NAME_PREFIX = ".security_audit_log";
|
||||||
|
@ -112,7 +112,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
|
||||||
|
|
||||||
private static final int DEFAULT_BULK_SIZE = 1000;
|
private static final int DEFAULT_BULK_SIZE = 1000;
|
||||||
private static final int MAX_BULK_SIZE = 10000;
|
private static final int MAX_BULK_SIZE = 10000;
|
||||||
private static final int DEFAULT_MAX_QUEUE_SIZE = 1000;
|
private static final int DEFAULT_MAX_QUEUE_SIZE = 10000;
|
||||||
private static final TimeValue DEFAULT_FLUSH_INTERVAL = TimeValue.timeValueSeconds(1);
|
private static final TimeValue DEFAULT_FLUSH_INTERVAL = TimeValue.timeValueSeconds(1);
|
||||||
private static final IndexNameResolver.Rollover DEFAULT_ROLLOVER = IndexNameResolver.Rollover.DAILY;
|
private static final IndexNameResolver.Rollover DEFAULT_ROLLOVER = IndexNameResolver.Rollover.DAILY;
|
||||||
private static final Setting<IndexNameResolver.Rollover> ROLLOVER_SETTING =
|
private static final Setting<IndexNameResolver.Rollover> ROLLOVER_SETTING =
|
||||||
|
@ -160,7 +160,6 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
|
||||||
private final Client client;
|
private final Client client;
|
||||||
private final QueueConsumer queueConsumer;
|
private final QueueConsumer queueConsumer;
|
||||||
private final ThreadPool threadPool;
|
private final ThreadPool threadPool;
|
||||||
private final AtomicBoolean putTemplatePending = new AtomicBoolean(false);
|
|
||||||
private final ClusterService clusterService;
|
private final ClusterService clusterService;
|
||||||
private final boolean indexToRemoteCluster;
|
private final boolean indexToRemoteCluster;
|
||||||
private final EnumSet<AuditLevel> events;
|
private final EnumSet<AuditLevel> events;
|
||||||
|
@ -214,10 +213,9 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
|
||||||
* </ol>
|
* </ol>
|
||||||
*
|
*
|
||||||
* @param event the {@link ClusterChangedEvent} containing the up to date cluster state
|
* @param event the {@link ClusterChangedEvent} containing the up to date cluster state
|
||||||
* @param master flag indicating if the current node is the master
|
|
||||||
* @return true if all requirements are met and the service can be started
|
* @return true if all requirements are met and the service can be started
|
||||||
*/
|
*/
|
||||||
public boolean canStart(ClusterChangedEvent event, boolean master) {
|
public boolean canStart(ClusterChangedEvent event) {
|
||||||
if (indexToRemoteCluster) {
|
if (indexToRemoteCluster) {
|
||||||
// just return true as we do not determine whether we can start or not based on the local cluster state, but must base it off
|
// just return true as we do not determine whether we can start or not based on the local cluster state, but must base it off
|
||||||
// of the remote cluster state and this method is called on the cluster state update thread, so we do not really want to
|
// of the remote cluster state and this method is called on the cluster state update thread, so we do not really want to
|
||||||
|
@ -225,11 +223,11 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
synchronized (this) {
|
synchronized (this) {
|
||||||
return canStart(event.state(), master);
|
return canStart(event.state());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean canStart(ClusterState clusterState, boolean master) {
|
private boolean canStart(ClusterState clusterState) {
|
||||||
if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
|
if (clusterState.blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK)) {
|
||||||
// wait until the gateway has recovered from disk, otherwise we think may not have audit indices
|
// wait until the gateway has recovered from disk, otherwise we think may not have audit indices
|
||||||
// but they may not have been restored from the cluster state on disk
|
// but they may not have been restored from the cluster state on disk
|
||||||
|
@ -237,8 +235,9 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!master && clusterState.metaData().templates().get(INDEX_TEMPLATE_NAME) == null) {
|
if (TemplateUtils.checkTemplateExistsAndVersionMatches(INDEX_TEMPLATE_NAME, IndexLifecycleManager.SECURITY_VERSION_STRING,
|
||||||
logger.debug("security audit index template [{}] does not exist, so service cannot start", INDEX_TEMPLATE_NAME);
|
clusterState, logger, Version.CURRENT::onOrAfter) == false) {
|
||||||
|
logger.debug("security audit index template [{}] is not up to date", INDEX_TEMPLATE_NAME);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -266,10 +265,8 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
|
||||||
* at the beginning of the method. The service's components are initialized and if the current node is the master, the index
|
* at the beginning of the method. The service's components are initialized and if the current node is the master, the index
|
||||||
* template will be stored. The state is moved {@link org.elasticsearch.xpack.security.audit.index.IndexAuditTrail.State#STARTED}
|
* template will be stored. The state is moved {@link org.elasticsearch.xpack.security.audit.index.IndexAuditTrail.State#STARTED}
|
||||||
* and before returning the queue of messages that came before the service started is drained.
|
* and before returning the queue of messages that came before the service started is drained.
|
||||||
*
|
|
||||||
* @param master flag indicating if the current node is master
|
|
||||||
*/
|
*/
|
||||||
public void start(boolean master) {
|
public void start() {
|
||||||
if (state.compareAndSet(State.INITIALIZED, State.STARTING)) {
|
if (state.compareAndSet(State.INITIALIZED, State.STARTING)) {
|
||||||
this.nodeHostName = clusterService.localNode().getHostName();
|
this.nodeHostName = clusterService.localNode().getHostName();
|
||||||
this.nodeHostAddress = clusterService.localNode().getHostAddress();
|
this.nodeHostAddress = clusterService.localNode().getHostAddress();
|
||||||
|
@ -277,55 +274,59 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
|
||||||
client.admin().cluster().prepareState().execute(new ActionListener<ClusterStateResponse>() {
|
client.admin().cluster().prepareState().execute(new ActionListener<ClusterStateResponse>() {
|
||||||
@Override
|
@Override
|
||||||
public void onResponse(ClusterStateResponse clusterStateResponse) {
|
public void onResponse(ClusterStateResponse clusterStateResponse) {
|
||||||
final boolean currentMaster = clusterService.state().getNodes().isLocalNodeElectedMaster();
|
if (canStart(clusterStateResponse.getState())) {
|
||||||
if (canStart(clusterStateResponse.getState(), currentMaster)) {
|
innerStart();
|
||||||
if (currentMaster) {
|
} else if (TemplateUtils.checkTemplateExistsAndVersionMatches(INDEX_TEMPLATE_NAME,
|
||||||
putTemplate(customAuditIndexSettings(settings), ActionListener.wrap((v) -> innerStart(),
|
IndexLifecycleManager.SECURITY_VERSION_STRING, clusterStateResponse.getState(), logger,
|
||||||
(e) -> state.set(State.FAILED)));
|
Version.CURRENT::onOrAfter) == false) {
|
||||||
} else {
|
putTemplate(customAuditIndexSettings(settings, logger), ActionListener.wrap((v) -> innerStart(),
|
||||||
innerStart();
|
(e) -> {
|
||||||
}
|
logger.error("failed to put audit trail template", e);
|
||||||
|
transitionStartingToInitialized();
|
||||||
|
}));
|
||||||
} else {
|
} else {
|
||||||
if (state.compareAndSet(State.STARTING, State.INITIALIZED) == false) {
|
|
||||||
throw new IllegalStateException("state transition from starting to initialized failed, current value: " +
|
|
||||||
state.get());
|
|
||||||
}
|
|
||||||
// for some reason we can't start up since the remote cluster is not fully setup. in this case
|
// for some reason we can't start up since the remote cluster is not fully setup. in this case
|
||||||
// we try to wait for yellow status (all primaries started up) this will also wait for
|
// we try to wait for yellow status (all primaries started up) this will also wait for
|
||||||
// state recovery etc.
|
// state recovery etc.
|
||||||
String indexName = getIndexName();
|
String indexName = getIndexName();
|
||||||
// if this index doesn't exists the call will fail with a not_found exception...
|
// if this index doesn't exists the call will fail with a not_found exception...
|
||||||
client.admin().cluster().prepareHealth().setIndices().setWaitForYellowStatus().execute(ActionListener.wrap(
|
client.admin().cluster().prepareHealth().setIndices().setWaitForYellowStatus().execute(ActionListener.wrap(
|
||||||
(x) -> start(master),
|
(x) -> start(),
|
||||||
(e) -> logger.error("failed to get wait for yellow status on index [" + indexName + "]", e))
|
(e) -> {
|
||||||
);
|
logger.error("failed to get wait for yellow status on index [" + indexName + "]", e);
|
||||||
|
transitionStartingToInitialized();
|
||||||
|
}));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void onFailure(Exception e) {
|
public void onFailure(Exception e) {
|
||||||
|
transitionStartingToInitialized();
|
||||||
logger.error("failed to get remote cluster state", e);
|
logger.error("failed to get remote cluster state", e);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
} else if (master) {
|
|
||||||
putTemplate(customAuditIndexSettings(settings), ActionListener.wrap((v) -> innerStart(),
|
|
||||||
(e) -> {
|
|
||||||
logger.error("failed to put audit trail template", e);
|
|
||||||
state.set(State.FAILED);
|
|
||||||
}));
|
|
||||||
} else {
|
} else {
|
||||||
innerStart();
|
innerStart();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void innerStart() {
|
private void transitionStartingToInitialized() {
|
||||||
if (indexToRemoteCluster == false) {
|
if (state.compareAndSet(State.STARTING, State.INITIALIZED) == false) {
|
||||||
this.clusterService.addListener(this);
|
final String message = "state transition from starting to initialized failed, current value: " + state.get();
|
||||||
|
assert false : message;
|
||||||
|
logger.error(message);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void innerStart() {
|
||||||
initializeBulkProcessor();
|
initializeBulkProcessor();
|
||||||
queueConsumer.start();
|
queueConsumer.start();
|
||||||
state.set(State.STARTED);
|
if (state.compareAndSet(State.STARTING, State.STARTED) == false) {
|
||||||
|
final String message = "state transition from starting to start ed failed, current value: " + state.get();
|
||||||
|
assert false : message;
|
||||||
|
logger.error(message);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public synchronized void stop() {
|
public synchronized void stop() {
|
||||||
|
@ -825,7 +826,7 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
|
||||||
return transportClient;
|
return transportClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
Settings customAuditIndexSettings(Settings nodeSettings) {
|
public static Settings customAuditIndexSettings(Settings nodeSettings, Logger logger) {
|
||||||
Settings newSettings = Settings.builder()
|
Settings newSettings = Settings.builder()
|
||||||
.put(INDEX_SETTINGS.get(nodeSettings), false)
|
.put(INDEX_SETTINGS.get(nodeSettings), false)
|
||||||
.build();
|
.build();
|
||||||
|
@ -847,10 +848,9 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
|
||||||
}
|
}
|
||||||
|
|
||||||
void putTemplate(Settings customSettings, ActionListener<Void> listener) {
|
void putTemplate(Settings customSettings, ActionListener<Void> listener) {
|
||||||
try (InputStream is = getClass().getResourceAsStream("/" + INDEX_TEMPLATE_NAME + ".json")) {
|
try {
|
||||||
ByteArrayOutputStream out = new ByteArrayOutputStream();
|
final byte[] template = TemplateUtils.loadTemplate("/" + INDEX_TEMPLATE_NAME + ".json",
|
||||||
Streams.copy(is, out);
|
Version.CURRENT.toString(), IndexLifecycleManager.TEMPLATE_VERSION_PATTERN).getBytes(StandardCharsets.UTF_8);
|
||||||
final byte[] template = out.toByteArray();
|
|
||||||
final PutIndexTemplateRequest request = new PutIndexTemplateRequest(INDEX_TEMPLATE_NAME).source(template, XContentType.JSON);
|
final PutIndexTemplateRequest request = new PutIndexTemplateRequest(INDEX_TEMPLATE_NAME).source(template, XContentType.JSON);
|
||||||
if (customSettings != null && customSettings.names().size() > 0) {
|
if (customSettings != null && customSettings.names().size() > 0) {
|
||||||
Settings updatedSettings = Settings.builder()
|
Settings updatedSettings = Settings.builder()
|
||||||
|
@ -942,33 +942,6 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
// this could be handled by a template registry service but adding that is extra complexity until we actually need it
|
|
||||||
@Override
|
|
||||||
public void clusterChanged(ClusterChangedEvent clusterChangedEvent) {
|
|
||||||
assert indexToRemoteCluster == false;
|
|
||||||
if (state() == State.STARTED
|
|
||||||
&& clusterChangedEvent.localNodeMaster()
|
|
||||||
&& clusterChangedEvent.state().metaData().templates().get(INDEX_TEMPLATE_NAME) == null
|
|
||||||
&& putTemplatePending.compareAndSet(false, true)) {
|
|
||||||
logger.debug("security audit index template [{}] does not exist. it may have been deleted - putting the template",
|
|
||||||
INDEX_TEMPLATE_NAME);
|
|
||||||
|
|
||||||
putTemplate(customAuditIndexSettings(settings), new ActionListener<Void>() {
|
|
||||||
@Override
|
|
||||||
public void onResponse(Void aVoid) {
|
|
||||||
putTemplatePending.set(false);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void onFailure(Exception e) {
|
|
||||||
putTemplatePending.set(false);
|
|
||||||
logger.error((Supplier<?>) () -> new ParameterizedMessage(
|
|
||||||
"failed to update security audit index template [{}]", INDEX_TEMPLATE_NAME), e);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// method for testing to allow different plugins such as mock transport...
|
// method for testing to allow different plugins such as mock transport...
|
||||||
List<Class<? extends Plugin>> remoteTransportClientPlugins() {
|
List<Class<? extends Plugin>> remoteTransportClientPlugins() {
|
||||||
return Collections.singletonList(XPackPlugin.class);
|
return Collections.singletonList(XPackPlugin.class);
|
||||||
|
@ -1097,7 +1070,6 @@ public class IndexAuditTrail extends AbstractComponent implements AuditTrail, Cl
|
||||||
STARTING,
|
STARTING,
|
||||||
STARTED,
|
STARTED,
|
||||||
STOPPING,
|
STOPPING,
|
||||||
STOPPED,
|
STOPPED
|
||||||
FAILED
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -52,7 +52,7 @@ public class IndexLifecycleManager extends AbstractComponent {
|
||||||
|
|
||||||
public static final String INTERNAL_SECURITY_INDEX = ".security-v6";
|
public static final String INTERNAL_SECURITY_INDEX = ".security-v6";
|
||||||
public static final int INTERNAL_INDEX_FORMAT = 6;
|
public static final int INTERNAL_INDEX_FORMAT = 6;
|
||||||
private static final String SECURITY_VERSION_STRING = "security-version";
|
public static final String SECURITY_VERSION_STRING = "security-version";
|
||||||
public static final String TEMPLATE_VERSION_PATTERN =
|
public static final String TEMPLATE_VERSION_PATTERN =
|
||||||
Pattern.quote("${security.template.version}");
|
Pattern.quote("${security.template.version}");
|
||||||
public static int NEW_INDEX_VERSION = IndexUpgradeCheck.UPRADE_VERSION;
|
public static int NEW_INDEX_VERSION = IndexUpgradeCheck.UPRADE_VERSION;
|
||||||
|
|
|
@ -1,11 +1,14 @@
|
||||||
{
|
{
|
||||||
"index_patterns": ".security_audit_log*",
|
"index_patterns": [ ".security_audit_log*" ],
|
||||||
"order": 2147483647,
|
"order": 2147483647,
|
||||||
"settings": {
|
"settings": {
|
||||||
"index.format": 6
|
"index.format": 6
|
||||||
},
|
},
|
||||||
"mappings": {
|
"mappings": {
|
||||||
"doc": {
|
"doc": {
|
||||||
|
"_meta": {
|
||||||
|
"security-version": "${security.template.version}"
|
||||||
|
},
|
||||||
"dynamic" : "strict",
|
"dynamic" : "strict",
|
||||||
"properties": {
|
"properties": {
|
||||||
"@timestamp": {
|
"@timestamp": {
|
||||||
|
|
|
@ -26,7 +26,6 @@ import org.elasticsearch.threadpool.TestThreadPool;
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
import org.elasticsearch.threadpool.ThreadPool;
|
||||||
import org.elasticsearch.transport.MockTransportClient;
|
import org.elasticsearch.transport.MockTransportClient;
|
||||||
import org.elasticsearch.transport.TransportMessage;
|
import org.elasticsearch.transport.TransportMessage;
|
||||||
import org.elasticsearch.xpack.security.InternalClient;
|
|
||||||
import org.elasticsearch.xpack.security.InternalSecurityClient;
|
import org.elasticsearch.xpack.security.InternalSecurityClient;
|
||||||
import org.elasticsearch.xpack.security.audit.index.IndexAuditTrail.State;
|
import org.elasticsearch.xpack.security.audit.index.IndexAuditTrail.State;
|
||||||
import org.elasticsearch.xpack.security.authc.AuthenticationToken;
|
import org.elasticsearch.xpack.security.authc.AuthenticationToken;
|
||||||
|
@ -314,7 +313,7 @@ public class IndexAuditTrailMutedTests extends ESTestCase {
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
auditTrail.start(true);
|
auditTrail.start();
|
||||||
assertThat(auditTrail.state(), is(State.STARTED));
|
assertThat(auditTrail.state(), is(State.STARTED));
|
||||||
return auditTrail;
|
return auditTrail;
|
||||||
}
|
}
|
||||||
|
|
|
@ -89,14 +89,14 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase {
|
||||||
private static boolean useSSL;
|
private static boolean useSSL;
|
||||||
private static InternalTestCluster remoteCluster;
|
private static InternalTestCluster remoteCluster;
|
||||||
private static Settings remoteSettings;
|
private static Settings remoteSettings;
|
||||||
|
private static int numShards = -1;
|
||||||
|
private static int numReplicas = -1;
|
||||||
|
|
||||||
private TransportAddress remoteAddress = buildNewFakeTransportAddress();
|
private TransportAddress remoteAddress = buildNewFakeTransportAddress();
|
||||||
private TransportAddress localAddress = new TransportAddress(InetAddress.getLoopbackAddress(), 0);
|
private TransportAddress localAddress = new TransportAddress(InetAddress.getLoopbackAddress(), 0);
|
||||||
private IndexNameResolver.Rollover rollover;
|
private IndexNameResolver.Rollover rollover;
|
||||||
private IndexAuditTrail auditor;
|
private IndexAuditTrail auditor;
|
||||||
private SetOnce<Message> enqueuedMessage;
|
private SetOnce<Message> enqueuedMessage;
|
||||||
private int numShards;
|
|
||||||
private int numReplicas;
|
|
||||||
private ThreadPool threadPool;
|
private ThreadPool threadPool;
|
||||||
private boolean includeRequestBody;
|
private boolean includeRequestBody;
|
||||||
|
|
||||||
|
@ -123,6 +123,22 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase {
|
||||||
return useSSL;
|
return useSSL;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Settings nodeSettings(int nodeOrdinal) {
|
||||||
|
if (numShards == -1) {
|
||||||
|
numShards = numberOfShards();
|
||||||
|
}
|
||||||
|
if (numReplicas == -1) {
|
||||||
|
numReplicas = numberOfReplicas();
|
||||||
|
}
|
||||||
|
|
||||||
|
return Settings.builder()
|
||||||
|
.put(super.nodeSettings(nodeOrdinal))
|
||||||
|
.put("xpack.security.audit.index.settings.index.number_of_shards", numShards)
|
||||||
|
.put("xpack.security.audit.index.settings.index.number_of_replicas", numReplicas)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void initializeRemoteClusterIfNecessary() throws Exception {
|
public void initializeRemoteClusterIfNecessary() throws Exception {
|
||||||
if (remoteIndexing == false) {
|
if (remoteIndexing == false) {
|
||||||
|
@ -149,6 +165,8 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase {
|
||||||
public Settings nodeSettings(int nodeOrdinal) {
|
public Settings nodeSettings(int nodeOrdinal) {
|
||||||
Settings.Builder builder = Settings.builder()
|
Settings.Builder builder = Settings.builder()
|
||||||
.put(super.nodeSettings(nodeOrdinal))
|
.put(super.nodeSettings(nodeOrdinal))
|
||||||
|
.put("xpack.security.audit.index.settings.index.number_of_shards", numShards)
|
||||||
|
.put("xpack.security.audit.index.settings.index.number_of_replicas", numReplicas)
|
||||||
// Disable native ML autodetect_process as the c++ controller won't be available
|
// Disable native ML autodetect_process as the c++ controller won't be available
|
||||||
.put(MachineLearning.AUTODETECT_PROCESS.getKey(), false)
|
.put(MachineLearning.AUTODETECT_PROCESS.getKey(), false)
|
||||||
.put(XPackSettings.SECURITY_ENABLED.getKey(), useSecurity);
|
.put(XPackSettings.SECURITY_ENABLED.getKey(), useSecurity);
|
||||||
|
@ -283,8 +301,6 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase {
|
||||||
|
|
||||||
private void initialize(String[] includes, String[] excludes) throws Exception {
|
private void initialize(String[] includes, String[] excludes) throws Exception {
|
||||||
rollover = randomFrom(HOURLY, DAILY, WEEKLY, MONTHLY);
|
rollover = randomFrom(HOURLY, DAILY, WEEKLY, MONTHLY);
|
||||||
numReplicas = numberOfReplicas();
|
|
||||||
numShards = numberOfShards();
|
|
||||||
includeRequestBody = randomBoolean();
|
includeRequestBody = randomBoolean();
|
||||||
Settings.Builder builder = Settings.builder();
|
Settings.Builder builder = Settings.builder();
|
||||||
if (remoteIndexing) {
|
if (remoteIndexing) {
|
||||||
|
@ -327,7 +343,7 @@ public class IndexAuditTrailTests extends SecurityIntegTestCase {
|
||||||
return Arrays.asList(XPackPlugin.class, getTestTransportPlugin());
|
return Arrays.asList(XPackPlugin.class, getTestTransportPlugin());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
auditor.start(true);
|
auditor.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAnonymousAccessDeniedTransport() throws Exception {
|
public void testAnonymousAccessDeniedTransport() throws Exception {
|
||||||
|
|
|
@ -1,96 +0,0 @@
|
||||||
/*
|
|
||||||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
|
|
||||||
* or more contributor license agreements. Licensed under the Elastic License;
|
|
||||||
* you may not use this file except in compliance with the Elastic License.
|
|
||||||
*/
|
|
||||||
package org.elasticsearch.xpack.security.audit.index;
|
|
||||||
|
|
||||||
import java.util.Locale;
|
|
||||||
|
|
||||||
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
|
|
||||||
import org.elasticsearch.cluster.node.DiscoveryNode;
|
|
||||||
import org.elasticsearch.cluster.service.ClusterService;
|
|
||||||
import org.elasticsearch.common.settings.Settings;
|
|
||||||
import org.elasticsearch.test.ESIntegTestCase;
|
|
||||||
import org.elasticsearch.test.SecurityIntegTestCase;
|
|
||||||
import org.elasticsearch.test.rest.FakeRestRequest;
|
|
||||||
import org.elasticsearch.threadpool.TestThreadPool;
|
|
||||||
import org.elasticsearch.threadpool.ThreadPool;
|
|
||||||
import org.elasticsearch.xpack.security.audit.index.IndexAuditTrail.State;
|
|
||||||
import org.junit.After;
|
|
||||||
|
|
||||||
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.DAILY;
|
|
||||||
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.HOURLY;
|
|
||||||
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.MONTHLY;
|
|
||||||
import static org.elasticsearch.xpack.security.audit.index.IndexNameResolver.Rollover.WEEKLY;
|
|
||||||
import static org.hamcrest.Matchers.notNullValue;
|
|
||||||
import static org.hamcrest.Matchers.nullValue;
|
|
||||||
import static org.mockito.Mockito.mock;
|
|
||||||
import static org.mockito.Mockito.when;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This test checks to make sure that the index audit trail actually updates the mappings on startups
|
|
||||||
*/
|
|
||||||
public class IndexAuditTrailUpdateMappingTests extends SecurityIntegTestCase {
|
|
||||||
private ThreadPool threadPool;
|
|
||||||
private IndexAuditTrail auditor;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void setUp() throws Exception {
|
|
||||||
super.setUp();
|
|
||||||
threadPool = new TestThreadPool("index audit trail update mapping tests");
|
|
||||||
}
|
|
||||||
|
|
||||||
public void testMappingIsUpdated() throws Exception {
|
|
||||||
// Setup
|
|
||||||
IndexNameResolver.Rollover rollover = randomFrom(HOURLY, DAILY, WEEKLY, MONTHLY);
|
|
||||||
Settings settings = Settings.builder().put("xpack.security.audit.index.rollover", rollover.name().toLowerCase(Locale.ENGLISH))
|
|
||||||
.put("path.home", createTempDir()).build();
|
|
||||||
DiscoveryNode localNode = mock(DiscoveryNode.class);
|
|
||||||
when(localNode.getHostAddress()).thenReturn(buildNewFakeTransportAddress().toString());
|
|
||||||
ClusterService clusterService = mock(ClusterService.class);
|
|
||||||
when(clusterService.localNode()).thenReturn(localNode);
|
|
||||||
auditor = new IndexAuditTrail(settings, internalSecurityClient(), threadPool, clusterService);
|
|
||||||
|
|
||||||
// before starting we add an event
|
|
||||||
auditor.authenticationFailed(new FakeRestRequest());
|
|
||||||
IndexAuditTrail.Message message = auditor.peek();
|
|
||||||
|
|
||||||
// resolve the index name and force create it
|
|
||||||
final String indexName = IndexNameResolver.resolve(IndexAuditTrail.INDEX_NAME_PREFIX, message.timestamp, rollover);
|
|
||||||
client().admin().indices().prepareCreate(indexName).get();
|
|
||||||
ensureGreen(indexName);
|
|
||||||
|
|
||||||
// default mapping
|
|
||||||
GetMappingsResponse response = client().admin().indices().prepareGetMappings(indexName).get();
|
|
||||||
assertThat(response.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), nullValue());
|
|
||||||
|
|
||||||
// start the audit trail which should update the mappings since it is the master
|
|
||||||
auditor.start(true);
|
|
||||||
assertTrue(awaitBusy(() -> auditor.state() == State.STARTED));
|
|
||||||
|
|
||||||
// get the updated mappings
|
|
||||||
GetMappingsResponse updated = client().admin().indices().prepareGetMappings(indexName).get();
|
|
||||||
assertThat(updated.mappings().get(indexName).get(IndexAuditTrail.DOC_TYPE), notNullValue());
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void beforeIndexDeletion() {
|
|
||||||
// no-op here because of the shard counter check
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* We need to use our own method instead of {@link ESIntegTestCase#tearDown()} since checks are run against the cluster before the
|
|
||||||
* teardown method is called by the {@link ESIntegTestCase#after()} method. If the {@link IndexAuditTrail} is still running and
|
|
||||||
* indexing tests will randomly fail with failing to obtain shard locks for the audit indices.
|
|
||||||
*/
|
|
||||||
@After
|
|
||||||
public void cleanUp() throws Exception {
|
|
||||||
if (auditor != null) {
|
|
||||||
auditor.stop();
|
|
||||||
}
|
|
||||||
if (threadPool != null) {
|
|
||||||
terminate(threadPool);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -37,15 +37,14 @@ public class IndexAuditIT extends ESIntegTestCase {
|
||||||
private static final String USER = "test_user";
|
private static final String USER = "test_user";
|
||||||
private static final String PASS = "x-pack-test-password";
|
private static final String PASS = "x-pack-test-password";
|
||||||
|
|
||||||
@AwaitsFix(bugUrl = "https://github.com/elastic/x-plugins/issues/2354")
|
public void testIndexAuditTrailWorking() throws Exception {
|
||||||
public void testShieldIndexAuditTrailWorking() throws Exception {
|
|
||||||
Response response = getRestClient().performRequest("GET", "/",
|
Response response = getRestClient().performRequest("GET", "/",
|
||||||
new BasicHeader(UsernamePasswordToken.BASIC_AUTH_HEADER,
|
new BasicHeader(UsernamePasswordToken.BASIC_AUTH_HEADER,
|
||||||
UsernamePasswordToken.basicAuthHeaderValue(USER, new SecureString(PASS.toCharArray()))));
|
UsernamePasswordToken.basicAuthHeaderValue(USER, new SecureString(PASS.toCharArray()))));
|
||||||
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
assertThat(response.getStatusLine().getStatusCode(), is(200));
|
||||||
final AtomicReference<ClusterState> lastClusterState = new AtomicReference<>();
|
final AtomicReference<ClusterState> lastClusterState = new AtomicReference<>();
|
||||||
final AtomicBoolean indexExists = new AtomicBoolean(false);
|
final AtomicBoolean indexExists = new AtomicBoolean(false);
|
||||||
boolean found = awaitBusy(() -> {
|
final boolean found = awaitBusy(() -> {
|
||||||
if (indexExists.get() == false) {
|
if (indexExists.get() == false) {
|
||||||
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||||
lastClusterState.set(state);
|
lastClusterState.set(state);
|
||||||
|
@ -63,22 +62,23 @@ public class IndexAuditIT extends ESIntegTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
ensureYellow(".security_audit_log*");
|
ensureYellow(".security_audit_log*");
|
||||||
|
logger.info("security audit log index is yellow");
|
||||||
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
ClusterState state = client().admin().cluster().prepareState().get().getState();
|
||||||
lastClusterState.set(state);
|
lastClusterState.set(state);
|
||||||
client().admin().indices().prepareRefresh().get();
|
|
||||||
|
logger.info("refreshing audit indices");
|
||||||
|
client().admin().indices().prepareRefresh(".security_audit_log*").get();
|
||||||
|
logger.info("refreshed audit indices");
|
||||||
return client().prepareSearch(".security_audit_log*").setQuery(QueryBuilders.matchQuery("principal", USER))
|
return client().prepareSearch(".security_audit_log*").setQuery(QueryBuilders.matchQuery("principal", USER))
|
||||||
.get().getHits().getTotalHits() > 0;
|
.get().getHits().getTotalHits() > 0;
|
||||||
}, 10L, TimeUnit.SECONDS);
|
}, 60L, TimeUnit.SECONDS);
|
||||||
|
|
||||||
if (!found) {
|
assertTrue("Did not find security audit index. Current cluster state:\n" + lastClusterState.get().toString(), found);
|
||||||
logger.info("current cluster state: {}", lastClusterState.get());
|
|
||||||
}
|
|
||||||
assertThat(found, is(true));
|
|
||||||
|
|
||||||
SearchResponse searchResponse = client().prepareSearch(".security_audit_log*").setQuery(
|
SearchResponse searchResponse = client().prepareSearch(".security_audit_log*").setQuery(
|
||||||
QueryBuilders.matchQuery("principal", USER)).get();
|
QueryBuilders.matchQuery("principal", USER)).get();
|
||||||
assertThat(searchResponse.getHits().getHits().length, greaterThan(0));
|
assertThat(searchResponse.getHits().getHits().length, greaterThan(0));
|
||||||
assertThat((String) searchResponse.getHits().getAt(0).getSourceAsMap().get("principal"), is(USER));
|
assertThat(searchResponse.getHits().getAt(0).getSourceAsMap().get("principal"), is(USER));
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testAuditTrailTemplateIsRecreatedAfterDelete() throws Exception {
|
public void testAuditTrailTemplateIsRecreatedAfterDelete() throws Exception {
|
||||||
|
|
Loading…
Reference in New Issue