Make auditor index messages asyn in the background, the auditor will not let the calling thread wait.

Also collapsed the the interface into the only implementation.

Original commit: elastic/x-pack-elasticsearch@c68f77627e
This commit is contained in:
Martijn van Groningen 2016-11-25 16:29:39 +01:00
parent e5e039973e
commit c387a3636d
7 changed files with 118 additions and 255 deletions

View File

@ -12,8 +12,8 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.xpack.prelert.utils.time.TimeUtils;
@ -21,8 +21,7 @@ import java.io.IOException;
import java.util.Date;
import java.util.Objects;
public class AuditActivity extends ToXContentToBytes implements Writeable
{
public class AuditActivity extends ToXContentToBytes implements Writeable {
public static final ParseField TYPE = new ParseField("auditActivity");
public static final ParseField TOTAL_JOBS = new ParseField("totalJobs");
@ -58,8 +57,7 @@ public class AuditActivity extends ToXContentToBytes implements Writeable
public AuditActivity() {
}
private AuditActivity(int totalJobs, int totalDetectors, int runningJobs, int runningDetectors)
{
private AuditActivity(int totalJobs, int totalDetectors, int runningJobs, int runningDetectors) {
this.totalJobs = totalJobs;
this.totalDetectors = totalDetectors;
this.runningJobs = runningJobs;
@ -90,58 +88,47 @@ public class AuditActivity extends ToXContentToBytes implements Writeable
}
}
public int getTotalJobs()
{
public int getTotalJobs() {
return totalJobs;
}
public void setTotalJobs(int totalJobs)
{
public void setTotalJobs(int totalJobs) {
this.totalJobs = totalJobs;
}
public int getTotalDetectors()
{
public int getTotalDetectors() {
return totalDetectors;
}
public void setTotalDetectors(int totalDetectors)
{
public void setTotalDetectors(int totalDetectors) {
this.totalDetectors = totalDetectors;
}
public int getRunningJobs()
{
public int getRunningJobs() {
return runningJobs;
}
public void setRunningJobs(int runningJobs)
{
public void setRunningJobs(int runningJobs) {
this.runningJobs = runningJobs;
}
public int getRunningDetectors()
{
public int getRunningDetectors() {
return runningDetectors;
}
public void setRunningDetectors(int runningDetectors)
{
public void setRunningDetectors(int runningDetectors) {
this.runningDetectors = runningDetectors;
}
public Date getTimestamp()
{
public Date getTimestamp() {
return timestamp;
}
public void setTimestamp(Date timestamp)
{
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
public static AuditActivity newActivity(int totalJobs, int totalDetectors, int runningJobs, int runningDetectors)
{
public static AuditActivity newActivity(int totalJobs, int totalDetectors, int runningJobs, int runningDetectors) {
return new AuditActivity(totalJobs, totalDetectors, runningJobs, runningDetectors);
}
@ -158,12 +145,12 @@ public class AuditActivity extends ToXContentToBytes implements Writeable
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(totalDetectors, totalJobs, runningDetectors, runningJobs, timestamp);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {

View File

@ -12,9 +12,9 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.ObjectParser.ValueType;
import org.elasticsearch.common.xcontent.XContentParser.Token;
import org.elasticsearch.xpack.prelert.job.Job;
import org.elasticsearch.xpack.prelert.utils.time.TimeUtils;
@ -23,8 +23,7 @@ import java.io.IOException;
import java.util.Date;
import java.util.Objects;
public class AuditMessage extends ToXContentToBytes implements Writeable
{
public class AuditMessage extends ToXContentToBytes implements Writeable {
public static final ParseField TYPE = new ParseField("auditMessage");
public static final ParseField MESSAGE = new ParseField("message");
@ -58,13 +57,11 @@ public class AuditMessage extends ToXContentToBytes implements Writeable
private Level level;
private Date timestamp;
public AuditMessage()
{
public AuditMessage() {
// Default constructor
}
private AuditMessage(String jobId, String message, Level severity)
{
private AuditMessage(String jobId, String message, Level severity) {
this.jobId = jobId;
this.message = message;
level = severity;
@ -98,63 +95,51 @@ public class AuditMessage extends ToXContentToBytes implements Writeable
}
}
public String getJobId()
{
public String getJobId() {
return jobId;
}
public void setJobId(String jobId)
{
public void setJobId(String jobId) {
this.jobId = jobId;
}
public String getMessage()
{
public String getMessage() {
return message;
}
public void setMessage(String message)
{
public void setMessage(String message) {
this.message = message;
}
public Level getLevel()
{
public Level getLevel() {
return level;
}
public void setLevel(Level level)
{
public void setLevel(Level level) {
this.level = level;
}
public Date getTimestamp()
{
public Date getTimestamp() {
return timestamp;
}
public void setTimestamp(Date timestamp)
{
public void setTimestamp(Date timestamp) {
this.timestamp = timestamp;
}
public static AuditMessage newInfo(String jobId, String message)
{
public static AuditMessage newInfo(String jobId, String message) {
return new AuditMessage(jobId, message, Level.INFO);
}
public static AuditMessage newWarning(String jobId, String message)
{
public static AuditMessage newWarning(String jobId, String message) {
return new AuditMessage(jobId, message, Level.WARNING);
}
public static AuditMessage newActivity(String jobId, String message)
{
public static AuditMessage newActivity(String jobId, String message) {
return new AuditMessage(jobId, message, Level.ACTIVITY);
}
public static AuditMessage newError(String jobId, String message)
{
public static AuditMessage newError(String jobId, String message) {
return new AuditMessage(jobId, message, Level.ERROR);
}
@ -176,12 +161,12 @@ public class AuditMessage extends ToXContentToBytes implements Writeable
builder.endObject();
return builder;
}
@Override
public int hashCode() {
return Objects.hash(jobId, message, level, timestamp);
}
@Override
public boolean equals(Object obj) {
if (obj == null) {

View File

@ -5,11 +5,76 @@
*/
package org.elasticsearch.xpack.prelert.job.audit;
public interface Auditor
{
void info(String message);
void warning(String message);
void error(String message);
void activity(String message);
void activity(int totalJobs, int totalDetectors, int runningJobs, int runningDetectors);
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class Auditor {
private static final Logger LOGGER = Loggers.getLogger(Auditor.class);
private final Client client;
private final String index;
private final String jobId;
public Auditor(Client client, String index, String jobId) {
this.client = Objects.requireNonNull(client);
this.index = index;
this.jobId = jobId;
}
public void info(String message) {
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newInfo(jobId, message));
}
public void warning(String message) {
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newWarning(jobId, message));
}
public void error(String message) {
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newError(jobId, message));
}
public void activity(String message) {
indexDoc(AuditMessage.TYPE.getPreferredName(), AuditMessage.newActivity(jobId, message));
}
public void activity(int totalJobs, int totalDetectors, int runningJobs, int runningDetectors) {
String type = AuditActivity.TYPE.getPreferredName();
indexDoc(type, AuditActivity.newActivity(totalJobs, totalDetectors, runningJobs, runningDetectors));
}
private void indexDoc(String type, ToXContent toXContent) {
client.prepareIndex(index, AuditActivity.TYPE.getPreferredName())
.setSource(toXContentBuilder(toXContent))
.execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
LOGGER.trace("Successfully persisted {}", type);
}
@Override
public void onFailure(Exception e) {
LOGGER.error(new ParameterizedMessage("Error writing {}", new Object[]{true}, e));
}
});
}
private XContentBuilder toXContentBuilder(ToXContent toXContent) {
try {
return toXContent.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
}

View File

@ -179,9 +179,7 @@ public class JobManager extends AbstractComponent {
jobProvider.createJobRelatedIndices(job, new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean indicesCreated) {
// NORELEASE: make auditing async too (we can't do
// blocking stuff here):
// audit(jobDetails.getId()).info(Messages.getMessage(Messages.JOB_AUDIT_CREATED));
audit(job.getId()).info(Messages.getMessage(Messages.JOB_AUDIT_CREATED));
// Also I wonder if we need to audit log infra
// structure in prelert as when we merge into xpack
@ -252,11 +250,7 @@ public class JobManager extends AbstractComponent {
public void onResponse(Boolean jobDeleted) {
if (jobDeleted) {
jobProvider.deleteJobRelatedIndices(request.getJobId(), actionListener);
// NORELEASE: This is not the place the audit log
// (indexes a document), because this method is
// executed on the cluster state update task thread and any
// action performed on that thread should be quick.
//audit(jobId).info(Messages.getMessage(Messages.JOB_AUDIT_DELETED));
audit(jobId).info(Messages.getMessage(Messages.JOB_AUDIT_DELETED));
} else {
actionListener.onResponse(new DeleteJobAction.Response(false));
}
@ -423,10 +417,6 @@ public class JobManager extends AbstractComponent {
return jobProvider.audit(jobId);
}
public Auditor systemAudit() {
return jobProvider.audit("");
}
public void revertSnapshot(RevertModelSnapshotAction.Request request, ActionListener<RevertModelSnapshotAction.Response> actionListener,
ModelSnapshot modelSnapshot) {
@ -436,16 +426,9 @@ public class JobManager extends AbstractComponent {
@Override
protected RevertModelSnapshotAction.Response newResponse(boolean acknowledged) {
if (acknowledged) {
audit(request.getJobId())
.info(Messages.getMessage(Messages.JOB_AUDIT_REVERTED, modelSnapshot.getDescription()));
return new RevertModelSnapshotAction.Response(modelSnapshot);
// NORELEASE: This is not the place the audit log
// (indexes a document), because this method is
// executed on the cluster state update task thread
// and any action performed on that thread should be
// quick. (so no indexing documents)
// audit(jobId).info(Messages.getMessage(Messages.JOB_AUDIT_REVERTED,
// modelSnapshot.getDescription()));
}
throw new IllegalStateException("Could not revert modelSnapshot on job ["
+ request.getJobId() + "], not acknowledged by master.");

View File

@ -1,105 +0,0 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.persistence;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.xpack.prelert.job.audit.AuditActivity;
import org.elasticsearch.xpack.prelert.job.audit.AuditMessage;
import org.elasticsearch.xpack.prelert.job.audit.Auditor;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
public class ElasticsearchAuditor implements Auditor
{
private static final Logger LOGGER = Loggers.getLogger(ElasticsearchAuditor.class);
private final Client client;
private final String index;
private final String jobId;
public ElasticsearchAuditor(Client client, String index, String jobId)
{
this.client = Objects.requireNonNull(client);
this.index = index;
this.jobId = jobId;
}
@Override
public void info(String message)
{
persistAuditMessage(AuditMessage.newInfo(jobId, message));
}
@Override
public void warning(String message)
{
persistAuditMessage(AuditMessage.newWarning(jobId, message));
}
@Override
public void error(String message)
{
persistAuditMessage(AuditMessage.newError(jobId, message));
}
@Override
public void activity(String message)
{
persistAuditMessage(AuditMessage.newActivity(jobId, message));
}
@Override
public void activity(int totalJobs, int totalDetectors, int runningJobs, int runningDetectors)
{
persistAuditActivity(AuditActivity.newActivity(totalJobs, totalDetectors, runningJobs, runningDetectors));
}
private void persistAuditMessage(AuditMessage message)
{
try
{
client.prepareIndex(index, AuditMessage.TYPE.getPreferredName())
.setSource(serialiseMessage(message))
.execute().actionGet();
}
catch (IOException | IndexNotFoundException e)
{
LOGGER.error("Error writing auditMessage", e);
}
}
private void persistAuditActivity(AuditActivity activity)
{
try
{
client.prepareIndex(index, AuditActivity.TYPE.getPreferredName())
.setSource(serialiseActivity(activity))
.execute().actionGet();
}
catch (IOException | IndexNotFoundException e)
{
LOGGER.error("Error writing auditActivity", e);
}
}
private XContentBuilder serialiseMessage(AuditMessage message) throws IOException
{
return message.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS);
}
private XContentBuilder serialiseActivity(AuditActivity activity) throws IOException
{
return activity.toXContent(jsonBuilder(), ToXContent.EMPTY_PARAMS);
}
}

View File

@ -1021,30 +1021,7 @@ public class ElasticsearchJobProvider implements JobProvider {
@Override
public Auditor audit(String jobId) {
// NORELEASE Create proper auditor or remove
// return new ElasticsearchAuditor(client, PRELERT_INFO_INDEX, jobId);
return new Auditor() {
@Override
public void info(String message) {
}
@Override
public void warning(String message) {
}
@Override
public void error(String message) {
}
@Override
public void activity(String message) {
}
@Override
public void activity(int totalJobs, int totalDetectors, int runningJobs, int runningDetectors) {
}
};
return new Auditor(client, JobResultsPersister.getJobIndexName(jobId), jobId);
}
private String esSortField(String sortField) {

View File

@ -3,7 +3,7 @@
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.prelert.job.persistence;
package org.elasticsearch.xpack.prelert.job.audit;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.when;
@ -18,16 +18,12 @@ import org.elasticsearch.common.ParseFieldMatcher;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.IndexNotFoundException;
import org.elasticsearch.test.ESTestCase;
import org.junit.Before;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;
import org.elasticsearch.xpack.prelert.job.audit.AuditActivity;
import org.elasticsearch.xpack.prelert.job.audit.AuditMessage;
import org.elasticsearch.xpack.prelert.job.audit.Level;
public class ElasticsearchAuditorTests extends ESTestCase {
public class AuditorTests extends ESTestCase {
private Client client;
private ListenableActionFuture<IndexResponse> indexResponse;
private ArgumentCaptor<String> indexCaptor;
@ -42,13 +38,10 @@ public class ElasticsearchAuditorTests extends ESTestCase {
jsonCaptor = ArgumentCaptor.forClass(XContentBuilder.class);
}
public void testInfo() {
givenClientPersistsSuccessfully();
ElasticsearchAuditor auditor = new ElasticsearchAuditor(client, "prelert-int", "foo");
Auditor auditor = new Auditor(client, "prelert-int", "foo");
auditor.info("Here is my info");
assertEquals("prelert-int", indexCaptor.getValue());
AuditMessage auditMessage = parseAuditMessage();
assertEquals("foo", auditMessage.getJobId());
@ -56,13 +49,10 @@ public class ElasticsearchAuditorTests extends ESTestCase {
assertEquals(Level.INFO, auditMessage.getLevel());
}
public void testWarning() {
givenClientPersistsSuccessfully();
ElasticsearchAuditor auditor = new ElasticsearchAuditor(client, "someIndex", "bar");
Auditor auditor = new Auditor(client, "someIndex", "bar");
auditor.warning("Here is my warning");
assertEquals("someIndex", indexCaptor.getValue());
AuditMessage auditMessage = parseAuditMessage();
assertEquals("bar", auditMessage.getJobId());
@ -70,13 +60,10 @@ public class ElasticsearchAuditorTests extends ESTestCase {
assertEquals(Level.WARNING, auditMessage.getLevel());
}
public void testError() {
givenClientPersistsSuccessfully();
ElasticsearchAuditor auditor = new ElasticsearchAuditor(client, "someIndex", "foobar");
Auditor auditor = new Auditor(client, "someIndex", "foobar");
auditor.error("Here is my error");
assertEquals("someIndex", indexCaptor.getValue());
AuditMessage auditMessage = parseAuditMessage();
assertEquals("foobar", auditMessage.getJobId());
@ -84,13 +71,10 @@ public class ElasticsearchAuditorTests extends ESTestCase {
assertEquals(Level.ERROR, auditMessage.getLevel());
}
public void testActivity_GivenString() {
givenClientPersistsSuccessfully();
ElasticsearchAuditor auditor = new ElasticsearchAuditor(client, "someIndex", "");
Auditor auditor = new Auditor(client, "someIndex", "");
auditor.activity("Here is my activity");
assertEquals("someIndex", indexCaptor.getValue());
AuditMessage auditMessage = parseAuditMessage();
assertEquals("", auditMessage.getJobId());
@ -98,13 +82,10 @@ public class ElasticsearchAuditorTests extends ESTestCase {
assertEquals(Level.ACTIVITY, auditMessage.getLevel());
}
public void testActivity_GivenNumbers() {
givenClientPersistsSuccessfully();
ElasticsearchAuditor auditor = new ElasticsearchAuditor(client, "someIndex", "");
Auditor auditor = new Auditor(client, "someIndex", "");
auditor.activity(10, 100, 5, 50);
assertEquals("someIndex", indexCaptor.getValue());
AuditActivity auditActivity = parseAuditActivity();
assertEquals(10, auditActivity.getTotalJobs());
@ -113,16 +94,6 @@ public class ElasticsearchAuditorTests extends ESTestCase {
assertEquals(50, auditActivity.getRunningDetectors());
}
public void testError_GivenNoSuchIndex() {
when(client.prepareIndex("someIndex", "auditMessage"))
.thenThrow(new IndexNotFoundException("someIndex"));
ElasticsearchAuditor auditor = new ElasticsearchAuditor(client, "someIndex", "foobar");
auditor.error("Here is my error");
}
private void givenClientPersistsSuccessfully() {
IndexRequestBuilder indexRequestBuilder = Mockito.mock(IndexRequestBuilder.class);
when(indexRequestBuilder.setSource(jsonCaptor.capture())).thenReturn(indexRequestBuilder);