[ML] Reduce timeout to 5s while indexing notifications

Original commit: elastic/x-pack-elasticsearch@b144f366f4
This commit is contained in:
Dimitrios Athanasiou 2017-04-21 14:57:13 +01:00
parent 61f7adbfc9
commit 73feee6317
2 changed files with 45 additions and 50 deletions

View File

@ -8,10 +8,12 @@ package org.elasticsearch.xpack.ml.notifications;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.message.ParameterizedMessage;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -46,19 +48,20 @@ public class Auditor {
}
private void indexDoc(String type, ToXContent toXContent) {
client.prepareIndex(NOTIFICATIONS_INDEX, type)
.setSource(toXContentBuilder(toXContent))
.execute(new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
LOGGER.trace("Successfully persisted {}", type);
}
IndexRequest indexRequest = new IndexRequest(NOTIFICATIONS_INDEX, type);
indexRequest.source(toXContentBuilder(toXContent));
indexRequest.timeout(TimeValue.timeValueSeconds(5));
client.index(indexRequest, new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
LOGGER.trace("Successfully persisted {}", type);
}
@Override
public void onFailure(Exception e) {
LOGGER.debug(new ParameterizedMessage("Error writing {}", new Object[]{type}, e));
}
});
@Override
public void onFailure(Exception e) {
LOGGER.debug(new ParameterizedMessage("Error writing {}", new Object[]{type}, e));
}
});
}
private XContentBuilder toXContentBuilder(ToXContent toXContent) {

View File

@ -5,14 +5,13 @@
*/
package org.elasticsearch.xpack.ml.notifications;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.Client;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.test.ESTestCase;
@ -21,18 +20,16 @@ import org.mockito.ArgumentCaptor;
import java.io.IOException;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
public class AuditorTests extends ESTestCase {
private Client client;
private ClusterService clusterService;
private ListenableActionFuture<IndexResponse> indexResponse;
private ArgumentCaptor<String> indexCaptor;
private ArgumentCaptor<String> typeCaptor;
private ArgumentCaptor<XContentBuilder> jsonCaptor;
private ArgumentCaptor<IndexRequest> indexRequestCaptor;
@SuppressWarnings("unchecked")
@Before
public void setUpMocks() {
client = mock(Client.class);
@ -41,61 +38,56 @@ public class AuditorTests extends ESTestCase {
when(dNode.getName()).thenReturn("this_node_has_a_name");
when(clusterService.localNode()).thenReturn(dNode);
indexResponse = mock(ListenableActionFuture.class);
indexCaptor = ArgumentCaptor.forClass(String.class);
typeCaptor = ArgumentCaptor.forClass(String.class);
jsonCaptor = ArgumentCaptor.forClass(XContentBuilder.class);
indexRequestCaptor = ArgumentCaptor.forClass(IndexRequest.class);
}
public void testInfo() throws IOException {
givenClientPersistsSuccessfully();
Auditor auditor = new Auditor(client, clusterService);
auditor.info("foo", "Here is my info");
assertEquals(".ml-notifications", indexCaptor.getValue());
assertEquals("audit_message", typeCaptor.getValue());
AuditMessage auditMessage = parseAuditMessage();
verify(client).index(indexRequestCaptor.capture(), any());
IndexRequest indexRequest = indexRequestCaptor.getValue();
assertArrayEquals(new String[] {".ml-notifications"}, indexRequest.indices());
assertEquals("audit_message", indexRequest.type());
assertEquals(TimeValue.timeValueSeconds(5), indexRequest.timeout());
AuditMessage auditMessage = parseAuditMessage(indexRequest.source());
assertEquals("foo", auditMessage.getJobId());
assertEquals("Here is my info", auditMessage.getMessage());
assertEquals(Level.INFO, auditMessage.getLevel());
}
public void testWarning() throws IOException {
givenClientPersistsSuccessfully();
Auditor auditor = new Auditor(client, clusterService);
auditor.warning("bar", "Here is my warning");
assertEquals(".ml-notifications", indexCaptor.getValue());
assertEquals("audit_message", typeCaptor.getValue());
AuditMessage auditMessage = parseAuditMessage();
verify(client).index(indexRequestCaptor.capture(), any());
IndexRequest indexRequest = indexRequestCaptor.getValue();
assertArrayEquals(new String[] {".ml-notifications"}, indexRequest.indices());
assertEquals("audit_message", indexRequest.type());
assertEquals(TimeValue.timeValueSeconds(5), indexRequest.timeout());
AuditMessage auditMessage = parseAuditMessage(indexRequest.source());
assertEquals("bar", auditMessage.getJobId());
assertEquals("Here is my warning", auditMessage.getMessage());
assertEquals(Level.WARNING, auditMessage.getLevel());
}
public void testError() throws IOException {
givenClientPersistsSuccessfully();
Auditor auditor = new Auditor(client, clusterService);
auditor.error("foobar", "Here is my error");
assertEquals(".ml-notifications", indexCaptor.getValue());
assertEquals("audit_message", typeCaptor.getValue());
AuditMessage auditMessage = parseAuditMessage();
verify(client).index(indexRequestCaptor.capture(), any());
IndexRequest indexRequest = indexRequestCaptor.getValue();
assertArrayEquals(new String[] {".ml-notifications"}, indexRequest.indices());
assertEquals("audit_message", indexRequest.type());
assertEquals(TimeValue.timeValueSeconds(5), indexRequest.timeout());
AuditMessage auditMessage = parseAuditMessage(indexRequest.source());
assertEquals("foobar", auditMessage.getJobId());
assertEquals("Here is my error", auditMessage.getMessage());
assertEquals(Level.ERROR, auditMessage.getLevel());
}
private void givenClientPersistsSuccessfully() {
IndexRequestBuilder indexRequestBuilder = mock(IndexRequestBuilder.class);
when(indexRequestBuilder.setSource(jsonCaptor.capture())).thenReturn(indexRequestBuilder);
when(indexRequestBuilder.execute()).thenReturn(indexResponse);
when(client.prepareIndex(indexCaptor.capture(), typeCaptor.capture()))
.thenReturn(indexRequestBuilder);
when(client.prepareIndex(indexCaptor.capture(), typeCaptor.capture()))
.thenReturn(indexRequestBuilder);
}
private AuditMessage parseAuditMessage() throws IOException {
String json = jsonCaptor.getValue().string();
XContentParser parser = XContentFactory.xContent(json).createParser(NamedXContentRegistry.EMPTY, json);
private AuditMessage parseAuditMessage(BytesReference msg) throws IOException {
XContentParser parser = XContentFactory.xContent(msg).createParser(NamedXContentRegistry.EMPTY, msg);
return AuditMessage.PARSER.apply(parser, null);
}
}