Tests: added tests for AlertStore, AlertService and AlertLockService

Changed ClientProxy to be return responses instead of ActionFutures and removed builders. This helps with mocking.

Original commit: elastic/x-pack-elasticsearch@bfc36d9405
This commit is contained in:
Martijn van Groningen 2015-03-17 17:27:40 -07:00
parent e6445a9d2e
commit 2377d1525b
10 changed files with 613 additions and 93 deletions

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import java.util.concurrent.atomic.AtomicBoolean;
@ -15,10 +16,13 @@ import java.util.concurrent.atomic.AtomicBoolean;
public class AlertLockService {
private final KeyedLock<String> alertLock = new KeyedLock<>();
private AtomicBoolean running = new AtomicBoolean();
private final AtomicBoolean running = new AtomicBoolean(false);
public Lock acquire(String name) {
if (!running.get()) {
throw new ElasticsearchIllegalStateException("not started");
}
alertLock.acquire(name);
return new Lock(name, alertLock);
}
@ -38,7 +42,7 @@ public class AlertLockService {
// ongoing operations to complete. Resulting in once the alert service starts again that more than
// expected alert action entries are processed.
//
// Note: new operations will fail now because the state has been set to: stopping
// Note: new operations will fail now because the running has been set to false
while (alertLock.hasLockedKeys()) {
try {
Thread.sleep(100);
@ -48,6 +52,10 @@ public class AlertLockService {
}
}
KeyedLock<String> getAlertLock() {
return alertLock;
}
public static class Lock {
private final String name;

View File

@ -13,8 +13,7 @@ import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.*;
import org.elasticsearch.alerts.support.Callback;
import org.elasticsearch.alerts.support.TemplateUtils;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
@ -31,10 +30,10 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.threadpool.ThreadPool;
import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@ -93,7 +92,7 @@ public class AlertsStore extends AbstractComponent {
if (state.routingTable().index(ALERT_INDEX).allPrimaryShardsActive()) {
logger.debug("alerts index [{}] found with all active primary shards. loading alerts...", ALERT_INDEX);
try {
int count = loadAlerts(client, scrollSize, scrollTimeout, alertIndexMetaData.numberOfShards(), alertParser, alertMap);
int count = loadAlerts(alertIndexMetaData.numberOfShards());
logger.debug("loaded [{}] alerts from the alert index [{}]", count, ALERT_INDEX);
} catch (Exception e) {
logger.debug("failed to load alerts for alert index [{}]. scheduled to retry alert loading...", e, ALERT_INDEX);
@ -226,42 +225,43 @@ public class AlertsStore extends AbstractComponent {
* scrolls all the alert documents in the alerts index, parses them, and loads them into
* the given map.
*/
static int loadAlerts(ClientProxy client, int scrollSize, TimeValue scrollTimeout, int numPrimaryShards, Alert.Parser parser, Map<String, Alert> alerts) {
assert alerts.isEmpty() : "no alerts should reside, but there are [" + alerts.size() + "] alerts.";
RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(ALERT_INDEX)).actionGet();
int loadAlerts(int numPrimaryShards) {
assert alertMap.isEmpty() : "no alerts should reside, but there are [" + alertMap.size() + "] alerts.";
RefreshResponse refreshResponse = client.refresh(new RefreshRequest(ALERT_INDEX));
if (refreshResponse.getSuccessfulShards() < numPrimaryShards) {
throw new AlertsException("not all required shards have been refreshed");
}
int count = 0;
SearchResponse response = client.prepareSearch(ALERT_INDEX)
.setTypes(ALERT_TYPE)
.setPreference("_primary")
.setSearchType(SearchType.SCAN)
.setScroll(scrollTimeout)
.setSize(scrollSize)
.setVersion(true)
.get();
SearchRequest searchRequest = new SearchRequest(ALERT_INDEX)
.types(ALERT_TYPE)
.preference("_primary")
.searchType(SearchType.SCAN)
.scroll(scrollTimeout)
.source(new SearchSourceBuilder()
.size(scrollSize)
.version(true));
SearchResponse response = client.search(searchRequest);
try {
if (response.getTotalShards() != response.getSuccessfulShards()) {
throw new ElasticsearchException("Partial response while loading alerts");
}
if (response.getHits().getTotalHits() > 0) {
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
response = client.searchScroll(response.getScrollId(), scrollTimeout);
while (response.getHits().hits().length != 0) {
for (SearchHit hit : response.getHits()) {
String name = hit.getId();
Alert alert = parser.parse(name, true, hit.getSourceRef());
Alert alert = alertParser.parse(name, true, hit.getSourceRef());
alert.status().version(hit.version());
alerts.put(name, alert);
alertMap.put(name, alert);
count++;
}
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
response = client.searchScroll(response.getScrollId(), scrollTimeout);
}
}
} finally {
client.prepareClearScroll().addScrollId(response.getScrollId()).get();
client.clearScroll(response.getScrollId());
}
return count;
}
@ -272,7 +272,7 @@ public class AlertsStore extends AbstractComponent {
}
}
public final class AlertPut {
public class AlertPut {
private final Alert previous;
private final Alert current;
@ -297,7 +297,7 @@ public class AlertsStore extends AbstractComponent {
}
}
public final class AlertDelete {
public class AlertDelete {
private final DeleteResponse response;

View File

@ -9,9 +9,7 @@ import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.*;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.alerts.support.TemplateUtils;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
@ -109,13 +107,13 @@ public class HistoryStore extends AbstractComponent {
}
}
RefreshResponse refreshResponse = client.admin().indices().refresh(new RefreshRequest(ALERT_HISTORY_INDEX_PREFIX + "*")).actionGet();
RefreshResponse refreshResponse = client.refresh(new RefreshRequest(ALERT_HISTORY_INDEX_PREFIX + "*"));
if (refreshResponse.getSuccessfulShards() < numPrimaryShards) {
return new LoadResult(false);
}
SearchRequest searchRequest = createScanSearchRequest(firedAlertState);
SearchResponse response = client.search(searchRequest).actionGet();
SearchResponse response = client.search(searchRequest);
List<FiredAlert> alerts = new ArrayList<>();
try {
if (response.getTotalShards() != response.getSuccessfulShards()) {
@ -123,7 +121,7 @@ public class HistoryStore extends AbstractComponent {
}
if (response.getHits().getTotalHits() > 0) {
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
response = client.searchScroll(response.getScrollId(), scrollTimeout);
while (response.getHits().hits().length != 0) {
for (SearchHit sh : response.getHits()) {
String historyId = sh.getId();
@ -132,11 +130,11 @@ public class HistoryStore extends AbstractComponent {
logger.debug("loaded fired alert from index [{}/{}/{}]", sh.index(), sh.type(), sh.id());
alerts.add(historyEntry);
}
response = client.prepareSearchScroll(response.getScrollId()).setScroll(scrollTimeout).get();
response = client.searchScroll(response.getScrollId(), scrollTimeout);
}
}
} finally {
client.prepareClearScroll().addScrollId(response.getScrollId()).get();
client.clearScroll(response.getScrollId());
}
templateUtils.ensureIndexTemplateIsLoaded(state, "alerthistory");
return new LoadResult(true, alerts);

View File

@ -74,7 +74,7 @@ public class SearchInput extends Input<SearchInput.Result> {
}
// actionGet deals properly with InterruptedException
SearchResponse response = client.search(request).actionGet();
SearchResponse response = client.search(request);
if (logger.isDebugEnabled()) {
logger.debug("[{}] found [{}] hits", ctx.id(), ctx.alert().name(), response.getHits().getTotalHits());
for (SearchHit hit : response.getHits()) {

View File

@ -7,6 +7,8 @@ package org.elasticsearch.alerts.support.init.proxy;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetRequestBuilder;
@ -18,6 +20,7 @@ import org.elasticsearch.alerts.support.init.InitializingService;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.Client;
import org.elasticsearch.common.inject.Injector;
import org.elasticsearch.common.unit.TimeValue;
/**
* A lazily initialized proxy to an elasticsearch {@link Client}. Inject this proxy whenever a client
@ -65,20 +68,23 @@ public class ClientProxy implements InitializingService.Initializable {
return client.prepareGet(index, type, id);
}
public ActionFuture<SearchResponse> search(SearchRequest request) {
return client.search(request);
public SearchResponse search(SearchRequest request) {
return client.search(request).actionGet();
}
public SearchRequestBuilder prepareSearch(String... indices) {
return client.prepareSearch(indices);
public SearchResponse searchScroll(String scrollId, TimeValue timeout) {
SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(timeout);
return client.searchScroll(request).actionGet();
}
public SearchScrollRequestBuilder prepareSearchScroll(String scrollId) {
return client.prepareSearchScroll(scrollId);
public ClearScrollResponse clearScroll(String scrollId) {
ClearScrollRequest request = new ClearScrollRequest();
request.addScrollId(scrollId);
return client.clearScroll(request).actionGet();
}
public ClearScrollRequestBuilder prepareClearScroll() {
return client.prepareClearScroll();
public RefreshResponse refresh(RefreshRequest request) {
return client.admin().indices().refresh(request).actionGet();
}
}

View File

@ -63,7 +63,7 @@ public class SearchTransform extends Transform<SearchTransform.Result> {
@Override
public Result apply(ExecutionContext ctx, Payload payload) throws IOException {
SearchRequest req = createRequest(request, ctx, payload);
SearchResponse resp = client.search(req).actionGet();
SearchResponse resp = client.search(req);
return new Result(TYPE, new Payload.XContent(resp));
}

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
/**
*/
public class AlertLockServiceTests extends ElasticsearchTestCase {
@Test
public void testLocking_notStarted() {
AlertLockService lockService = new AlertLockService();
try {
lockService.acquire("_name");
fail("exception expected");
} catch (Exception e) {
assertThat(e.getMessage(), equalTo("not started"));
}
}
@Test
public void testLocking() {
AlertLockService lockService = new AlertLockService();
lockService.start();
AlertLockService.Lock lock = lockService.acquire("_name");
assertThat(lockService.getAlertLock().hasLockedKeys(), is(true));
lock.release();
assertThat(lockService.getAlertLock().hasLockedKeys(), is(false));
lockService.stop();
}
@Test
public void testLocking_alreadyHeld() {
AlertLockService lockService = new AlertLockService();
lockService.start();
AlertLockService.Lock lock1 = lockService.acquire("_name");
try {
lockService.acquire("_name");
fail("exception expected");
} catch (ElasticsearchIllegalStateException e) {
assertThat(e.getMessage(), containsString("Lock already acquired"));
}
lock1.release();
lockService.stop();
}
}

View File

@ -0,0 +1,171 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.alerts.history.HistoryService;
import org.elasticsearch.alerts.scheduler.Scheduler;
import org.elasticsearch.alerts.scheduler.schedule.Schedule;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Before;
import org.junit.Test;
import java.lang.reflect.Field;
import java.util.concurrent.atomic.AtomicReference;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.sameInstance;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
/**
*/
public class AlertServiceTests extends ElasticsearchTestCase {
private Scheduler scheduler;
private AlertsStore alertsStore;
private AlertsService alertsService;
private HistoryService historyService;
private AlertLockService alertLockService;
@Before
public void init() throws Exception {
scheduler = mock(Scheduler.class);
alertsStore = mock(AlertsStore.class);
historyService = mock(HistoryService.class);
alertLockService = mock(AlertLockService.class);
alertsService = new AlertsService(ImmutableSettings.EMPTY, scheduler, alertsStore, historyService, alertLockService);
Field field = AlertsService.class.getDeclaredField("state");
field.setAccessible(true);
AtomicReference<AlertsService.State> state = (AtomicReference<AlertsService.State>) field.get(alertsService);
state.set(AlertsService.State.STARTED);
}
@Test
public void testPutAlert() {
IndexResponse indexResponse = mock(IndexResponse.class);
Alert alert = mock(Alert.class);
AlertsStore.AlertPut alertPut = mock(AlertsStore.AlertPut.class);
when(alertPut.indexResponse()).thenReturn(indexResponse);
when(alertPut.current()).thenReturn(alert);
AlertLockService.Lock lock = mock(AlertLockService.Lock.class);
when(alertLockService.acquire(any(String.class))).thenReturn(lock);
when(alertsStore.putAlert(any(String.class), any(BytesReference.class))).thenReturn(alertPut);
IndexResponse response = alertsService.putAlert("_name", new BytesArray("{}"));
assertThat(response, sameInstance(indexResponse));
verify(scheduler, times(1)).add(any(Scheduler.Job.class));
}
@Test
public void testPutAlert_notSchedule() {
Schedule schedule = mock(Schedule.class);
IndexResponse indexResponse = mock(IndexResponse.class);
Alert alert = mock(Alert.class);
when(alert.schedule()).thenReturn(schedule);
AlertsStore.AlertPut alertPut = mock(AlertsStore.AlertPut.class);
when(alertPut.indexResponse()).thenReturn(indexResponse);
when(alertPut.current()).thenReturn(alert);
Alert previousAlert = mock(Alert.class);
when(previousAlert.schedule()).thenReturn(schedule);
when(alertPut.previous()).thenReturn(previousAlert);
AlertLockService.Lock lock = mock(AlertLockService.Lock.class);
when(alertLockService.acquire(any(String.class))).thenReturn(lock);
when(alertsStore.putAlert(any(String.class), any(BytesReference.class))).thenReturn(alertPut);
IndexResponse response = alertsService.putAlert("_name", new BytesArray("{}"));
assertThat(response, sameInstance(indexResponse));
verifyZeroInteractions(scheduler);
}
@Test
public void testDeleteAlert() throws Exception {
AlertLockService.Lock lock = mock(AlertLockService.Lock.class);
when(alertLockService.acquire("_name")).thenReturn(lock);
AlertsStore.AlertDelete expectedAlertDelete = mock(AlertsStore.AlertDelete.class);
DeleteResponse deleteResponse = mock(DeleteResponse.class);
when(deleteResponse.isFound()).thenReturn(true);
when(expectedAlertDelete.deleteResponse()).thenReturn(deleteResponse);
when(alertsStore.deleteAlert("_name")).thenReturn(expectedAlertDelete);
AlertsStore.AlertDelete alertDelete = alertsService.deleteAlert("_name");
assertThat(alertDelete, sameInstance(expectedAlertDelete));
verify(scheduler, times(1)).remove("_name");
}
@Test
public void testDeleteAlert_notFound() throws Exception {
AlertLockService.Lock lock = mock(AlertLockService.Lock.class);
when(alertLockService.acquire("_name")).thenReturn(lock);
AlertsStore.AlertDelete expectedAlertDelete = mock(AlertsStore.AlertDelete.class);
DeleteResponse deleteResponse = mock(DeleteResponse.class);
when(deleteResponse.isFound()).thenReturn(false);
when(expectedAlertDelete.deleteResponse()).thenReturn(deleteResponse);
when(alertsStore.deleteAlert("_name")).thenReturn(expectedAlertDelete);
AlertsStore.AlertDelete alertDelete = alertsService.deleteAlert("_name");
assertThat(alertDelete, sameInstance(expectedAlertDelete));
verifyZeroInteractions(scheduler);
}
@Test
public void testAckAlert() throws Exception {
AlertLockService.Lock lock = mock(AlertLockService.Lock.class);
when(alertLockService.acquire("_name")).thenReturn(lock);
Alert alert = mock(Alert.class);
when(alert.ack()).thenReturn(true);
Alert.Status status = new Alert.Status();
when(alert.status()).thenReturn(status);
when(alertsStore.getAlert("_name")).thenReturn(alert);
Alert.Status result = alertsService.ackAlert("_name");
assertThat(result, not(sameInstance(status)));
verify(alertsStore, times(1)).updateAlertStatus(alert);
}
@Test
public void testAckAlert_notAck() throws Exception {
AlertLockService.Lock lock = mock(AlertLockService.Lock.class);
when(alertLockService.acquire("_name")).thenReturn(lock);
Alert alert = mock(Alert.class);
when(alert.ack()).thenReturn(false);
Alert.Status status = new Alert.Status();
when(alert.status()).thenReturn(status);
when(alertsStore.getAlert("_name")).thenReturn(alert);
Alert.Status result = alertsService.ackAlert("_name");
assertThat(result, not(sameInstance(status)));
verify(alertsStore, never()).updateAlertStatus(alert);
}
@Test
public void testAckAlert_noAlert() throws Exception {
AlertLockService.Lock lock = mock(AlertLockService.Lock.class);
when(alertLockService.acquire("_name")).thenReturn(lock);
when(alertsStore.getAlert("_name")).thenReturn(null);
try {
alertsService.ackAlert("_name");
fail();
} catch (AlertsException e) {
// expected
}
verify(alertsStore, never()).updateAlertStatus(any(Alert.class));
}
}

View File

@ -0,0 +1,306 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.alerts;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.search.*;
import org.elasticsearch.alerts.support.Callback;
import org.elasticsearch.alerts.support.TemplateUtils;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ClusterStateListener;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.SearchHitField;
import org.elasticsearch.search.internal.InternalSearchHit;
import org.elasticsearch.search.internal.InternalSearchHits;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.Before;
import org.junit.Test;
import java.util.Collections;
import static org.hamcrest.core.Is.is;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
/**
*/
public class AlertStoreTests extends ElasticsearchTestCase {
private AlertsStore alertsStore;
private ClientProxy clientProxy;
private TemplateUtils templateUtils;
private Alert.Parser parser;
private ClusterService clusterService;
private ThreadPool threadPool;
private Callback<ClusterState> callback;
@Before
public void init() {
clientProxy = mock(ClientProxy.class);
templateUtils = mock(TemplateUtils.class);
parser = mock(Alert.Parser.class);
clusterService = mock(ClusterService.class);
threadPool = mock(ThreadPool.class);
callback = mock(Callback.class);
alertsStore = new AlertsStore(ImmutableSettings.EMPTY, clientProxy, templateUtils, parser, clusterService, threadPool);
}
@Test
public void testStartNoPreviousAlertsIndex() {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder();
csBuilder.metaData(metaDateBuilder);
ClusterState cs = csBuilder.build();
alertsStore.start(cs, callback);
assertThat(alertsStore.started(), is(true));
assertThat(alertsStore.getAlerts().size(), equalTo(0));
verify(callback, times(1)).onSuccess(any(ClusterState.class));
verify(callback, never()).onFailure(any(Throwable.class));
verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "alerts");
verifyZeroInteractions(clientProxy);
verifyZeroInteractions(clusterService);
alertsStore.start(cs, callback);
verify(callback, times(2)).onSuccess(any(ClusterState.class));
verify(callback, never()).onFailure(any(Throwable.class));
verifyNoMoreInteractions(templateUtils);
verifyZeroInteractions(clientProxy);
verifyZeroInteractions(clusterService);
}
@Test
public void testStartPrimaryShardNotReady() {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(AlertsStore.ALERT_INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(AlertsStore.ALERT_INDEX);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(AlertsStore.ALERT_INDEX, 0), false)
.addShard(new ImmutableShardRouting(AlertsStore.ALERT_INDEX, 0, "_node_id", null, true, ShardRoutingState.UNASSIGNED, 1))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder);
ClusterState cs = csBuilder.build();
alertsStore.start(cs, callback);
verify(clusterService, timeout(1)).add(any(ClusterStateListener.class));
verifyZeroInteractions(callback);
verifyZeroInteractions(templateUtils);
verifyZeroInteractions(clientProxy);
}
@Test
public void testStartRefreshFailed() {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(AlertsStore.ALERT_INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(AlertsStore.ALERT_INDEX);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(AlertsStore.ALERT_INDEX, 0), false)
.addShard(new ImmutableShardRouting(AlertsStore.ALERT_INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED, 1))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder);
RefreshResponse refreshResponse = mockRefreshResponse(1, 0);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
ClusterState cs = csBuilder.build();
alertsStore.start(cs, callback);
verify(clusterService, timeout(1)).add(any(ClusterStateListener.class));
verifyZeroInteractions(callback);
verifyZeroInteractions(templateUtils);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, never()).search(any(SearchRequest.class));
verify(clientProxy, never()).clearScroll(anyString());
}
@Test
public void testStartSearchFailed() {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(AlertsStore.ALERT_INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(AlertsStore.ALERT_INDEX);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(AlertsStore.ALERT_INDEX, 0), false)
.addShard(new ImmutableShardRouting(AlertsStore.ALERT_INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED, 1))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder);
RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
SearchResponse searchResponse = mockSearchResponse(1, 0, 0);
when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse);
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0));
ClusterState cs = csBuilder.build();
alertsStore.start(cs, callback);
verify(clusterService, timeout(1)).add(any(ClusterStateListener.class));
verifyZeroInteractions(callback);
verifyZeroInteractions(templateUtils);
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class));
verify(clientProxy, times(1)).clearScroll(anyString());
}
@Test
public void testStartNoAlertStored() {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(AlertsStore.ALERT_INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(AlertsStore.ALERT_INDEX);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(AlertsStore.ALERT_INDEX, 0), false)
.addShard(new ImmutableShardRouting(AlertsStore.ALERT_INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED, 1))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder);
RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
SearchResponse searchResponse = mockSearchResponse(1, 1, 0);
when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse);
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0));
ClusterState cs = csBuilder.build();
alertsStore.start(cs, callback);
verifyZeroInteractions(clusterService);
assertThat(alertsStore.started(), is(true));
assertThat(alertsStore.getAlerts().size(), equalTo(0));
verify(callback, times(1)).onSuccess(any(ClusterState.class));
verify(callback, never()).onFailure(any(Throwable.class));
verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "alerts");
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class));
verify(clientProxy, times(1)).clearScroll(anyString());
}
@Test
public void testStartAlertStored() {
ClusterState.Builder csBuilder = new ClusterState.Builder(new ClusterName("_name"));
MetaData.Builder metaDateBuilder = MetaData.builder();
RoutingTable.Builder routingTableBuilder = RoutingTable.builder();
Settings settings = ImmutableSettings.builder()
.put(IndexMetaData.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetaData.SETTING_NUMBER_OF_REPLICAS, 1)
.build();
metaDateBuilder.put(IndexMetaData.builder(AlertsStore.ALERT_INDEX).settings(settings).numberOfShards(1).numberOfReplicas(1));
IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(AlertsStore.ALERT_INDEX);
indexRoutingTableBuilder.addIndexShard(new IndexShardRoutingTable.Builder(new ShardId(AlertsStore.ALERT_INDEX, 0), false)
.addShard(new ImmutableShardRouting(AlertsStore.ALERT_INDEX, 0, "_node_id", null, true, ShardRoutingState.STARTED, 1))
.build());
indexRoutingTableBuilder.addReplica();
routingTableBuilder.add(indexRoutingTableBuilder.build());
csBuilder.metaData(metaDateBuilder);
csBuilder.routingTable(routingTableBuilder);
RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
SearchResponse searchResponse1 = mockSearchResponse(1, 1, 2);
when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse1);
BytesReference source = new BytesArray("{}");
InternalSearchHit hit1 = new InternalSearchHit(0, "_id1", new StringText("type"), Collections.<String, SearchHitField>emptyMap());
hit1.sourceRef(source);
InternalSearchHit hit2 = new InternalSearchHit(1, "_id2", new StringText("type"), Collections.<String, SearchHitField>emptyMap());
hit2.sourceRef(source);
SearchResponse searchResponse2 = mockSearchResponse(1, 1, 2, hit1, hit2);
SearchResponse searchResponse3 = mockSearchResponse(1, 1, 2);
when(clientProxy.searchScroll(anyString(), any(TimeValue.class))).thenReturn(searchResponse2, searchResponse3);
Alert alert1 = mock(Alert.class);
Alert.Status status = new Alert.Status();
when(alert1.status()).thenReturn(status);
Alert alert2 = mock(Alert.class);
when(alert2.status()).thenReturn(status);
when(parser.parse("_id1", true, source)).thenReturn(alert1);
when(parser.parse("_id2", true, source)).thenReturn(alert2);
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 0));
ClusterState cs = csBuilder.build();
alertsStore.start(cs, callback);
verifyZeroInteractions(clusterService);
assertThat(alertsStore.started(), is(true));
assertThat(alertsStore.getAlerts().size(), equalTo(2));
verify(callback, times(1)).onSuccess(any(ClusterState.class));
verify(callback, never()).onFailure(any(Throwable.class));
verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "alerts");
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
verify(clientProxy, times(1)).search(any(SearchRequest.class));
verify(clientProxy, times(2)).searchScroll(anyString(), any(TimeValue.class));
verify(clientProxy, times(1)).clearScroll(anyString());
}
private RefreshResponse mockRefreshResponse(int total, int successful) {
RefreshResponse refreshResponse = mock(RefreshResponse.class);
when(refreshResponse.getTotalShards()).thenReturn(total);
when(refreshResponse.getSuccessfulShards()).thenReturn(successful);
return refreshResponse;
}
private SearchResponse mockSearchResponse(int total, int successful, int totalHits, InternalSearchHit... hits) {
InternalSearchHits internalSearchHits = new InternalSearchHits(hits, totalHits, 1f);
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.getTotalShards()).thenReturn(total);
when(searchResponse.getSuccessfulShards()).thenReturn(successful);
when(searchResponse.getHits()).thenReturn(internalSearchHits);
when(searchResponse.getHits()).thenReturn(internalSearchHits);
return searchResponse;
}
}

View File

@ -6,7 +6,6 @@
package org.elasticsearch.alerts.history;
import com.google.common.collect.ImmutableSet;
import org.elasticsearch.action.ActionFuture;
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
import org.elasticsearch.action.admin.indices.refresh.RefreshResponse;
import org.elasticsearch.action.index.IndexRequest;
@ -17,8 +16,6 @@ import org.elasticsearch.alerts.Alert;
import org.elasticsearch.alerts.condition.simple.AlwaysTrueCondition;
import org.elasticsearch.alerts.support.TemplateUtils;
import org.elasticsearch.alerts.support.init.proxy.ClientProxy;
import org.elasticsearch.client.AdminClient;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.cluster.ClusterName;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexMetaData;
@ -46,7 +43,6 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.core.IsEqual.equalTo;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.*;
import static org.mockito.Mockito.verifyZeroInteractions;
/**
*/
@ -194,13 +190,14 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
csBuilder.routingTable(routingTableBuilder);
ClusterState cs = csBuilder.build();
mockRefresh(0);
RefreshResponse refreshResponse = mockRefreshResponse(1, 0);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
HistoryStore.LoadResult result = historyStore.loadFiredAlerts(cs, FiredAlert.State.AWAITS_EXECUTION);
assertThat(result.succeeded(), is(false));
assertThat(ImmutableSet.copyOf(result).size(), equalTo(0));
verifyZeroInteractions(templateUtils);
verify(clientProxy, times(1)).admin();
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
}
@Test
@ -225,26 +222,22 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
csBuilder.routingTable(routingTableBuilder);
ClusterState cs = csBuilder.build();
mockRefresh(1);
RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.getSuccessfulShards()).thenReturn(0);
when(searchResponse.getTotalShards()).thenReturn(1);
ActionFuture<SearchResponse> actionFuture = mock(ActionFuture.class);
when(actionFuture.actionGet()).thenReturn(searchResponse);
when(clientProxy.search(any(SearchRequest.class))).thenReturn(actionFuture);
when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse);
ClearScrollRequestBuilder clearScrollRequestBuilder = mock(ClearScrollRequestBuilder.class);
when(clearScrollRequestBuilder.addScrollId(anyString())).thenReturn(clearScrollRequestBuilder);
when(clearScrollRequestBuilder.get()).thenReturn(new ClearScrollResponse(true, 1));
when(clientProxy.prepareClearScroll()).thenReturn(clearScrollRequestBuilder);
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 1));
HistoryStore.LoadResult result = historyStore.loadFiredAlerts(cs, FiredAlert.State.AWAITS_EXECUTION);
assertThat(result.succeeded(), is(false));
assertThat(ImmutableSet.copyOf(result).size(), equalTo(0));
verifyZeroInteractions(templateUtils);
verify(clientProxy, times(1)).admin();
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
}
@Test
@ -269,27 +262,23 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
csBuilder.routingTable(routingTableBuilder);
ClusterState cs = csBuilder.build();
mockRefresh(1);
RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
SearchResponse searchResponse = mock(SearchResponse.class);
when(searchResponse.getSuccessfulShards()).thenReturn(1);
when(searchResponse.getTotalShards()).thenReturn(1);
when(searchResponse.getHits()).thenReturn(InternalSearchHits.empty());
ActionFuture<SearchResponse> actionFuture = mock(ActionFuture.class);
when(actionFuture.actionGet()).thenReturn(searchResponse);
when(clientProxy.search(any(SearchRequest.class))).thenReturn(actionFuture);
when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse);
ClearScrollRequestBuilder clearScrollRequestBuilder = mock(ClearScrollRequestBuilder.class);
when(clearScrollRequestBuilder.addScrollId(anyString())).thenReturn(clearScrollRequestBuilder);
when(clearScrollRequestBuilder.get()).thenReturn(new ClearScrollResponse(true, 1));
when(clientProxy.prepareClearScroll()).thenReturn(clearScrollRequestBuilder);
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 1));
HistoryStore.LoadResult result = historyStore.loadFiredAlerts(cs, FiredAlert.State.AWAITS_EXECUTION);
assertThat(result.succeeded(), is(true));
assertThat(ImmutableSet.copyOf(result).size(), equalTo(0));
verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "alerthistory");
verify(clientProxy, times(1)).admin();
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
}
@Test
@ -314,7 +303,8 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
csBuilder.routingTable(routingTableBuilder);
ClusterState cs = csBuilder.build();
mockRefresh(1);
RefreshResponse refreshResponse = mockRefreshResponse(1, 1);
when(clientProxy.refresh(any(RefreshRequest.class))).thenReturn(refreshResponse);
SearchResponse searchResponse1 = mock(SearchResponse.class);
when(searchResponse1.getSuccessfulShards()).thenReturn(1);
@ -325,38 +315,27 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
hit.sourceRef(new BytesArray("{}"));
InternalSearchHits hits = new InternalSearchHits(new InternalSearchHit[]{hit}, 1, 1.0f);
when(searchResponse1.getHits()).thenReturn(hits);
ActionFuture<SearchResponse> actionFuture = mock(ActionFuture.class);
when(actionFuture.actionGet()).thenReturn(searchResponse1);
when(clientProxy.search(any(SearchRequest.class))).thenReturn(actionFuture);
when(clientProxy.search(any(SearchRequest.class))).thenReturn(searchResponse1);
// First return a scroll response with a single hit
SearchScrollRequestBuilder searchScrollRequestBuilder1 = mock(SearchScrollRequestBuilder.class);
when(searchScrollRequestBuilder1.setScroll(any(TimeValue.class))).thenReturn(searchScrollRequestBuilder1);
when(searchScrollRequestBuilder1.get()).thenReturn(searchResponse1);
when(clientProxy.prepareSearchScroll(anyString())).thenReturn(searchScrollRequestBuilder1);
when(clientProxy.searchScroll(anyString(), any(TimeValue.class))).thenReturn(searchResponse1);
// then with no hits
SearchScrollRequestBuilder searchScrollRequestBuilder2 = mock(SearchScrollRequestBuilder.class);
when(searchScrollRequestBuilder2.setScroll(any(TimeValue.class))).thenReturn(searchScrollRequestBuilder2);
SearchResponse searchResponse2 = new SearchResponse(InternalSearchResponse.empty(), null, 1, 1, 1, null);
when(searchScrollRequestBuilder2.get()).thenReturn(searchResponse2);
when(clientProxy.prepareSearchScroll(anyString())).thenReturn(searchScrollRequestBuilder2);
when(clientProxy.searchScroll(anyString(), any(TimeValue.class))).thenReturn(searchResponse2);
FiredAlert firedAlert = mock(FiredAlert.class);
when(firedAlert.state()).thenReturn(FiredAlert.State.AWAITS_EXECUTION);
when(parser.parse(any(BytesReference.class), eq("_id"), eq(1l))).thenReturn(firedAlert);
ClearScrollRequestBuilder clearScrollRequestBuilder = mock(ClearScrollRequestBuilder.class);
when(clearScrollRequestBuilder.addScrollId(anyString())).thenReturn(clearScrollRequestBuilder);
when(clearScrollRequestBuilder.get()).thenReturn(new ClearScrollResponse(true, 1));
when(clientProxy.prepareClearScroll()).thenReturn(clearScrollRequestBuilder);
when(clientProxy.clearScroll(anyString())).thenReturn(new ClearScrollResponse(true, 1));
HistoryStore.LoadResult result = historyStore.loadFiredAlerts(cs, FiredAlert.State.AWAITS_EXECUTION);
assertThat(result.succeeded(), is(true));
assertThat(ImmutableSet.copyOf(result).size(), equalTo(0));
verify(templateUtils, times(1)).ensureIndexTemplateIsLoaded(cs, "alerthistory");
verify(clientProxy, times(1)).admin();
verify(clientProxy, times(1)).refresh(any(RefreshRequest.class));
}
@Test
@ -367,16 +346,11 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
assertThat(HistoryStore.getAlertHistoryIndexNameForTime(new DateTime(2833165811000L, DateTimeZone.UTC)), equalTo(".alert_history_2059-10-12"));
}
private void mockRefresh(int numSuccessfulShards) {
AdminClient adminClient = mock(AdminClient.class);
when(clientProxy.admin()).thenReturn(adminClient);
IndicesAdminClient indicesAdminClient = mock(IndicesAdminClient.class);
when(adminClient.indices()).thenReturn(indicesAdminClient);
ActionFuture<RefreshResponse> actionFuture = mock(ActionFuture.class);
when(indicesAdminClient.refresh(any(RefreshRequest.class))).thenReturn(actionFuture);
private RefreshResponse mockRefreshResponse(int total, int successful) {
RefreshResponse refreshResponse = mock(RefreshResponse.class);
when(actionFuture.actionGet()).thenReturn(refreshResponse);
when(refreshResponse.getSuccessfulShards()).thenReturn(numSuccessfulShards);
when(refreshResponse.getTotalShards()).thenReturn(total);
when(refreshResponse.getSuccessfulShards()).thenReturn(successful);
return refreshResponse;
}
}