Change default throttle period to 5s.

This change makes the default throttle period 5s.
This default can be controlled from config with the parameter `watcher.throttle.period.default_period`.
Added test and updated docs.

Original commit: elastic/x-pack-elasticsearch@cf8f5de724
This commit is contained in:
Brian Murphy 2015-04-16 14:01:53 -04:00
parent 0a7cf71152
commit 90fa55d1eb
2 changed files with 61 additions and 3 deletions

View File

@ -44,12 +44,16 @@ import org.elasticsearch.watcher.trigger.TriggerService;
import java.io.IOException;
import java.util.Locale;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.watcher.support.WatcherDateUtils.*;
public class Watch implements TriggerEngine.Job, ToXContent {
private final static TimeValue DEFAULT_THROTTLE_PERIOD = new TimeValue(5, TimeUnit.SECONDS);
private final static String DEFAULT_THROTTLE_PERIOD_SETTING = "watcher.throttle.period.default_period";
private final String name;
private final Trigger trigger;
private final Input input;
@ -193,6 +197,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
private final Input defaultInput;
private final Condition defaultCondition;
private final TimeValue defaultThrottleTimePeriod;
@Inject
public Parser(Settings settings, LicenseService licenseService, ConditionRegistry conditionRegistry, TriggerService triggerService,
@ -210,6 +215,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
this.defaultInput = new NoneInput(logger);
this.defaultCondition = new AlwaysTrueCondition(logger);
this.defaultThrottleTimePeriod = settings.getAsTime(DEFAULT_THROTTLE_PERIOD_SETTING, DEFAULT_THROTTLE_PERIOD);
}
public Watch parse(String name, boolean includeStatus, BytesReference source) {
@ -231,7 +237,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
Transform transform = null;
Map<String, Object> metatdata = null;
Status status = null;
TimeValue throttlePeriod = null;
TimeValue throttlePeriod = defaultThrottleTimePeriod;
String currentFieldName = null;
XContentParser.Token token = null;

View File

@ -47,7 +47,7 @@ import static org.hamcrest.core.IsEqual.equalTo;
public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
@Test
public void testAckThrottle() throws Exception {
public void test_AckThrottle() throws Exception {
WatcherClient watcherClient = watcherClient();
IndexResponse eventIndexResponse = indexTestDoc();
@ -58,7 +58,8 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
.input(searchInput(matchAllRequest().indices("events")))
.condition(scriptCondition("ctx.payload.hits.total > 0"))
.transform(searchTransform(matchAllRequest().indices("events")))
.addAction("_id", indexAction("actions", "action")))
.addAction("_id", indexAction("actions", "action"))
.throttlePeriod(new TimeValue(0, TimeUnit.SECONDS)))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
@ -109,6 +110,7 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
assertThat(throttledCount, greaterThan(0L));
}
public IndexResponse indexTestDoc() {
createIndex("actions", "events");
ensureGreen("actions", "events");
@ -190,6 +192,7 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
}
}
@Test
@Repeat(iterations = 2)
public void test_ack_with_restart() throws Exception {
@ -255,4 +258,53 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
long countAfterPostAckFires = docCount("actions", "action", matchAllQuery());
assertThat(countAfterPostAckFires, equalTo(countAfterAck));
}
@Test @Repeat(iterations = 10)
public void test_default_TimeThrottle() throws Exception {
WatcherClient watcherClient = watcherClient();
indexTestDoc();
PutWatchResponse putWatchResponse = watcherClient.preparePutWatch()
.setId("_name")
.setSource(watchBuilder()
.trigger(schedule(interval("1s")))
.input(searchInput(matchAllRequest().indices("events")))
.condition(scriptCondition("ctx.payload.hits.total > 0"))
.transform(searchTransform(matchAllRequest().indices("events")))
.addAction("_id", indexAction("actions", "action")))
.get();
assertThat(putWatchResponse.isCreated(), is(true));
if (timeWarped()) {
timeWarp().clock().setTime(DateTime.now(DateTimeZone.UTC));
timeWarp().scheduler().trigger("_name");
refresh();
// the first fire should work
long actionsCount = docCount("actions", "action", matchAllQuery());
assertThat(actionsCount, is(1L));
timeWarp().clock().fastForwardSeconds(2);
timeWarp().scheduler().trigger("_name");
refresh();
// the last fire should have been throttled, so number of actions shouldn't change
actionsCount = docCount("actions", "action", matchAllQuery());
assertThat(actionsCount, is(1L));
timeWarp().clock().fastForwardSeconds(10);
timeWarp().scheduler().trigger("_name");
refresh();
// the last fire occurred passed the throttle period, so a new action should have been added
actionsCount = docCount("actions", "action", matchAllQuery());
assertThat(actionsCount, is(2L));
long throttledCount = docCount(HistoryStore.INDEX_PREFIX + "*", null,
matchQuery(WatchRecord.Parser.STATE_FIELD.getPreferredName(), WatchRecord.State.THROTTLED.id()));
assertThat(throttledCount, is(1L));
}
}
}