The DateTimes watcher uses for scheduled and trigger times should always be UTC

Before this change DateTimes were being constructed without providing a time zone,
this was causing some non-utc time leakage. In particular watch record ids were being created with non utc dates and
watch records were going to the wrong .watch_history index.
Add Clock.now(DateTimeZone) to allow callers to get UTC now.
Also construct the DateTimes with UTC timezones when we construct from millis.
Add all constuctors of DateTime that do not specify a time zone to the forbidden APIs.
This change makes constructing a `DateTime` object without providing a `DateTimeZone` forbidden.
This is much safer and makes time zone errors much more unlikely to occur.
Statically import DateTimeZone.UTC everywhere it was being used
Now all calls that use DateTimeZone.UTC just refrence UTC.

Fixes elastic/elasticsearch#150

Original commit: elastic/x-pack-elasticsearch@7f23ce605e
This commit is contained in:
Brian Murphy 2015-04-14 23:17:07 -04:00
parent 29c76b9c8a
commit 41e42f0945
27 changed files with 148 additions and 85 deletions

View File

@ -57,4 +57,14 @@ java.nio.channels.GatheringByteChannel#write(java.nio.ByteBuffer[])
java.nio.channels.ReadableByteChannel#read(java.nio.ByteBuffer)
java.nio.channels.ScatteringByteChannel#read(java.nio.ByteBuffer[])
java.nio.channels.ScatteringByteChannel#read(java.nio.ByteBuffer[], int, int)
java.nio.channels.FileChannel#read(java.nio.ByteBuffer, long)
java.nio.channels.FileChannel#read(java.nio.ByteBuffer, long)
@defaultMessage Constructing a DateTime without a time zone is dangerous - use DateTime(DateTimeZone.getDefault()) if you really want the default timezone
org.elasticsearch.common.joda.time.DateTime#<init>()
org.elasticsearch.common.joda.time.DateTime#<init>(long)
org.elasticsearch.common.joda.time.DateTime#<init>(int, int, int, int, int)
org.elasticsearch.common.joda.time.DateTime#<init>(int, int, int, int, int, int)
org.elasticsearch.common.joda.time.DateTime#<init>(int, int, int, int, int, int, int)
org.elasticsearch.common.joda.time.DateTime#now()

View File

@ -9,12 +9,12 @@ import org.elasticsearch.common.io.FastStringReader;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.joda.time.format.ISODateTimeFormat;
import java.io.IOException;
import java.util.Properties;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
/**
*/
public class WatcherBuild {
@ -36,7 +36,7 @@ public class WatcherBuild {
}
String gitTimestampRaw = props.getProperty("timestamp");
if (gitTimestampRaw != null) {
timestamp = ISODateTimeFormat.dateTimeNoMillis().withZone(DateTimeZone.UTC).print(Long.parseLong(gitTimestampRaw));
timestamp = ISODateTimeFormat.dateTimeNoMillis().withZone(UTC).print(Long.parseLong(gitTimestampRaw));
}
} catch (Exception e) {
// just ignore...

View File

@ -20,9 +20,9 @@ import javax.mail.internet.InternetAddress;
import javax.mail.internet.MimeMessage;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.text.ParseException;
import java.util.*;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
/**
*
*/
@ -50,7 +50,7 @@ public class Email implements ToXContent {
this.from = from;
this.replyTo = replyTo;
this.priority = priority;
this.sentDate = sentDate != null ? sentDate : new DateTime();
this.sentDate = sentDate != null ? sentDate : new DateTime(UTC);
this.to = to;
this.cc = cc;
this.bcc = bcc;

View File

@ -42,6 +42,7 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
/**
*/
public class ExecutionService extends AbstractComponent {
@ -121,7 +122,7 @@ public class ExecutionService extends AbstractComponent {
final LinkedList<WatchRecord> records = new LinkedList<>();
final LinkedList<TriggeredExecutionContext> contexts = new LinkedList<>();
DateTime now = clock.now();
DateTime now = clock.now(UTC);
for (TriggerEvent event : events) {
Watch watch = watchStore.get(event.jobName());
if (watch == null) {
@ -182,7 +183,7 @@ public class ExecutionService extends AbstractComponent {
final LinkedList<WatchRecord> records = new LinkedList<>();
final LinkedList<TriggeredExecutionContext> contexts = new LinkedList<>();
DateTime now = clock.now();
DateTime now = clock.now(UTC);
for (TriggerEvent event : events) {
Watch watch = watchStore.get(event.jobName());
if (watch == null) {
@ -291,7 +292,7 @@ public class ExecutionService extends AbstractComponent {
logger.warn("unable to find watch [{}]/[{}] in watch store. perhaps it has been deleted. skipping...", record.name(), record.id());
continue;
}
TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, clock.now(), record.triggerEvent());
TriggeredExecutionContext ctx = new TriggeredExecutionContext(watch, clock.now(UTC), record.triggerEvent());
executeAsync(ctx, record);
counter++;
}

View File

@ -9,7 +9,6 @@ import org.elasticsearch.common.base.Predicate;
import org.elasticsearch.common.base.Predicates;
import org.elasticsearch.common.collect.ImmutableSet;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.watcher.condition.Condition;
import org.elasticsearch.watcher.input.Input;
import org.elasticsearch.watcher.throttle.Throttler;
@ -19,6 +18,8 @@ import org.elasticsearch.watcher.watch.Watch;
import java.util.HashMap;
import java.util.Set;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
/**
*/
public class ManualExecutionContext extends WatchExecutionContext {
@ -117,7 +118,7 @@ public class ManualExecutionContext extends WatchExecutionContext {
public ManualExecutionContext build() {
if (executionTime == null) {
executionTime = DateTime.now(DateTimeZone.UTC);
executionTime = DateTime.now(UTC);
}
if (triggerEvent == null) {
triggerEvent = new ManualTriggerEvent(watch.id(), executionTime, new HashMap<String, Object>());

View File

@ -5,13 +5,14 @@
*/
package org.elasticsearch.watcher.support;
import org.elasticsearch.watcher.WatcherSettingsException;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.joda.FormatDateTimeFormatter;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.index.mapper.core.DateFieldMapper;
import org.elasticsearch.watcher.WatcherSettingsException;
import java.io.IOException;
@ -25,20 +26,21 @@ public class WatcherDateUtils {
private WatcherDateUtils() {
}
public static DateTime parseDate(String format) {
return dateTimeFormatter.parser().parseDateTime(format);
public static DateTime parseDate(String format, DateTimeZone timeZone) {
DateTime dateTime = dateTimeFormatter.parser().parseDateTime(format);
return dateTime.toDateTime(timeZone);
}
public static String formatDate(DateTime date) {
return dateTimeFormatter.printer().print(date);
}
public static DateTime parseDate(String fieldName, XContentParser.Token token, XContentParser parser) throws IOException {
public static DateTime parseDate(String fieldName, XContentParser.Token token, XContentParser parser, DateTimeZone timeZone) throws IOException {
if (token == XContentParser.Token.VALUE_NUMBER) {
return new DateTime(parser.longValue());
return new DateTime(parser.longValue(), timeZone);
}
if (token == XContentParser.Token.VALUE_STRING) {
return dateTimeFormatter.parser().parseDateTime(parser.text());
return parseDate(parser.text(), timeZone);
}
if (token == XContentParser.Token.VALUE_NULL) {
return null;
@ -51,8 +53,8 @@ public class WatcherDateUtils {
out.writeLong(date.getMillis());
}
public static DateTime readDate(StreamInput in) throws IOException {
return new DateTime(in.readLong());
public static DateTime readDate(StreamInput in, DateTimeZone timeZone) throws IOException {
return new DateTime(in.readLong(), timeZone);
}
public static void writeOptionalDate(StreamOutput out, DateTime date) throws IOException {
@ -64,7 +66,7 @@ public class WatcherDateUtils {
out.writeLong(date.getMillis());
}
public static DateTime readOptionalDate(StreamInput in) throws IOException {
return in.readBoolean() ? new DateTime(in.readLong()) : null;
public static DateTime readOptionalDate(StreamInput in, DateTimeZone timeZone) throws IOException {
return in.readBoolean() ? new DateTime(in.readLong(), timeZone) : null;
}
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.watcher.support.clock;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.unit.TimeValue;
/**
@ -19,6 +20,8 @@ public interface Clock {
DateTime now();
DateTime now(DateTimeZone timeZone);
TimeValue timeElapsedSince(DateTime time);
}

View File

@ -6,6 +6,7 @@
package org.elasticsearch.watcher.support.clock;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.unit.TimeValue;
/**
@ -30,9 +31,15 @@ public final class SystemClock implements Clock {
@Override
public DateTime now() {
return DateTime.now();
return now(DateTimeZone.getDefault());
}
@Override
public DateTime now(DateTimeZone timeZone) {
return DateTime.now(timeZone);
}
@Override
public TimeValue timeElapsedSince(DateTime time) {
return TimeValue.timeValueMillis(System.currentTimeMillis() - time.getMillis());

View File

@ -35,6 +35,7 @@ import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.watch.Watch;
import org.elasticsearch.watcher.watch.WatchStore;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
/**
* Performs the watch execution operation.
*/
@ -78,7 +79,7 @@ public class TransportExecuteWatchAction extends WatcherTransportAction<ExecuteW
}
ManualExecutionContext.Builder ctxBuilder = ManualExecutionContext.builder(watch);
DateTime executionTime = clock.now();
DateTime executionTime = clock.now(UTC);
ctxBuilder.executionTime(executionTime);
if (request.isSimulateAllActions()) {
ctxBuilder.simulateAllActions();

View File

@ -17,6 +17,7 @@ import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
/**
*/
public class ManualTriggerEvent extends TriggerEvent {
@ -59,7 +60,7 @@ public class ManualTriggerEvent extends TriggerEvent {
} else {
if (token == XContentParser.Token.VALUE_STRING) {
if (TRIGGERED_TIME_FIELD.match(currentFieldName)) {
triggeredTime = WatcherDateUtils.parseDate(parser.text());
triggeredTime = WatcherDateUtils.parseDate(parser.text(), UTC);
} else {
throw new ParseException("could not parse trigger event for [" + context + "]. unknown string value field [" + currentFieldName + "]");
}

View File

@ -15,6 +15,8 @@ import org.elasticsearch.watcher.trigger.TriggerEvent;
import java.io.IOException;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
/**
*
*/
@ -63,9 +65,9 @@ public class ScheduleTriggerEvent extends TriggerEvent {
} else {
if (token == XContentParser.Token.VALUE_STRING) {
if (TRIGGERED_TIME_FIELD.match(currentFieldName)) {
triggeredTime = WatcherDateUtils.parseDate(parser.text());
triggeredTime = WatcherDateUtils.parseDate(parser.text(), UTC);
} else if (SCHEDULED_TIME_FIELD.match(currentFieldName)) {
scheduledTime = WatcherDateUtils.parseDate(parser.text());
scheduledTime = WatcherDateUtils.parseDate(parser.text(), UTC);
} else {
throw new ParseException("could not parse trigger event for [" + context + "]. unknown string value field [" + currentFieldName + "]");
}

View File

@ -23,6 +23,7 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
/**
*
*/
@ -85,8 +86,8 @@ public class SchedulerScheduleTriggerEngine extends ScheduleTriggerEngine {
}
protected void notifyListeners(String name, long triggeredTime, long scheduledTime) {
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime), new DateTime(scheduledTime));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(name, new DateTime(triggeredTime), new DateTime(scheduledTime));
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", name, new DateTime(triggeredTime, UTC), new DateTime(scheduledTime, UTC));
final ScheduleTriggerEvent event = new ScheduleTriggerEvent(name, new DateTime(triggeredTime, UTC), new DateTime(scheduledTime, UTC));
for (Listener listener : listeners) {
listener.triggered(ImmutableList.<TriggerEvent>of(event));
}

View File

@ -21,6 +21,7 @@ import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CountDownLatch;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
/**
*
*/
@ -77,8 +78,8 @@ public class TickerScheduleTriggerEngine extends ScheduleTriggerEngine {
for (ActiveSchedule schedule : schedules.values()) {
long scheduledTime = schedule.check(triggeredTime);
if (scheduledTime > 0) {
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", schedule.name, new DateTime(triggeredTime), new DateTime(scheduledTime));
events.add(new ScheduleTriggerEvent(schedule.name, new DateTime(triggeredTime), new DateTime(scheduledTime)));
logger.trace("triggered job [{}] at [{}] (scheduled time was [{}])", schedule.name, new DateTime(triggeredTime, UTC), new DateTime(scheduledTime, UTC));
events.add(new ScheduleTriggerEvent(schedule.name, new DateTime(triggeredTime, UTC), new DateTime(scheduledTime, UTC)));
if (events.size() >= 1000) {
notifyListeners(ImmutableList.copyOf(events));
events.clear();

View File

@ -10,6 +10,7 @@ import org.elasticsearch.common.cli.CliToolConfig;
import org.elasticsearch.common.cli.Terminal;
import org.elasticsearch.common.cli.commons.CommandLine;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.joda.time.format.DateTimeFormat;
import org.elasticsearch.common.joda.time.format.DateTimeFormatter;
import org.elasticsearch.common.settings.Settings;
@ -82,7 +83,7 @@ public class CronEvalTool extends CliTool {
terminal.println("Valid!");
DateTime date = DateTime.now();
DateTime date = DateTime.now(DateTimeZone.getDefault());
terminal.println("Now is [" + formatter.print(date) + "]");
terminal.println("Here are the next " + count + " times this cron expression will trigger:");

View File

@ -14,7 +14,6 @@ import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.io.stream.Streamable;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.ToXContent;
@ -47,6 +46,7 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.watcher.support.WatcherDateUtils.*;
public class Watch implements TriggerEngine.Job, ToXContent {
@ -129,7 +129,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
* @return {@code true} if the status of this watch changed, {@code false} otherwise.
*/
public boolean ack() {
return status.onAck(new DateTime());
return status.onAck(new DateTime(UTC));
}
public boolean acked() {
@ -486,11 +486,11 @@ public class Watch implements TriggerEngine.Job, ToXContent {
@Override
public void readFrom(StreamInput in) throws IOException {
version = in.readLong();
lastChecked = readOptionalDate(in);
lastMetCondition = readOptionalDate(in);
lastExecuted = readOptionalDate(in);
lastThrottle = in.readBoolean() ? new Throttle(readDate(in), in.readString()) : null;
ackStatus = new AckStatus(AckStatus.State.valueOf(in.readString()), readDate(in));
lastChecked = readOptionalDate(in, UTC);
lastMetCondition = readOptionalDate(in, UTC);
lastExecuted = readOptionalDate(in, UTC);
lastThrottle = in.readBoolean() ? new Throttle(readDate(in, UTC), in.readString()) : null;
ackStatus = new AckStatus(AckStatus.State.valueOf(in.readString()), readDate(in, UTC));
}
public static Status read(StreamInput in) throws IOException {
@ -539,19 +539,19 @@ public class Watch implements TriggerEngine.Job, ToXContent {
currentFieldName = parser.currentName();
} else if (LAST_CHECKED_FIELD.match(currentFieldName)) {
if (token.isValue()) {
lastChecked = parseDate(currentFieldName, token, parser);
lastChecked = parseDate(currentFieldName, token, parser, UTC);
} else {
throw new WatcherException("expecting field [" + currentFieldName + "] to hold a date value, found [" + token + "] instead");
}
} else if (LAST_MET_CONDITION_FIELD.match(currentFieldName)) {
if (token.isValue()) {
lastMetCondition = parseDate(currentFieldName, token, parser);
lastMetCondition = parseDate(currentFieldName, token, parser, UTC);
} else {
throw new WatcherException("expecting field [" + currentFieldName + "] to hold a date value, found [" + token + "] instead");
}
} else if (LAST_EXECUTED_FIELD.match(currentFieldName)) {
if (token.isValue()) {
lastExecuted = parseDate(currentFieldName, token, parser);
lastExecuted = parseDate(currentFieldName, token, parser, UTC);
} else {
throw new WatcherException("expecting field [" + currentFieldName + "] to hold a date value, found [" + token + "] instead");
}
@ -564,7 +564,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (TIMESTAMP_FIELD.match(currentFieldName)) {
timestamp = parseDate(currentFieldName, token, parser);
timestamp = parseDate(currentFieldName, token, parser, UTC);
} else if (REASON_FIELD.match(currentFieldName)) {
reason = parser.text();
} else {
@ -585,7 +585,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
currentFieldName = parser.currentName();
} else if (token.isValue()) {
if (TIMESTAMP_FIELD.match(currentFieldName)) {
timestamp = parseDate(currentFieldName, token, parser);
timestamp = parseDate(currentFieldName, token, parser, UTC);
} else if (STATE_FIELD.match(currentFieldName)) {
state = AckStatus.State.valueOf(parser.text().toUpperCase(Locale.ROOT));
} else {
@ -616,7 +616,7 @@ public class Watch implements TriggerEngine.Job, ToXContent {
private final DateTime timestamp;
public AckStatus() {
this(State.AWAITS_EXECUTION, new DateTime(DateTimeZone.UTC));
this(State.AWAITS_EXECUTION, new DateTime(UTC));
}
public AckStatus(State state, DateTime timestamp) {

View File

@ -312,7 +312,7 @@ public class EmailActionTests extends ElasticsearchTestCase {
Email email = Email.builder().id("_id")
.from(new Email.Address("from@domain"))
.to(Email.AddressList.parse("to@domain"))
.sentDate(new DateTime())
.sentDate(new DateTime(UTC))
.subject("_subject")
.textBody("_text_body")
.build();
@ -361,7 +361,7 @@ public class EmailActionTests extends ElasticsearchTestCase {
Email email = Email.builder().id("_id")
.from(new Email.Address("from@domain"))
.to(Email.AddressList.parse("to@domain"))
.sentDate(new DateTime())
.sentDate(new DateTime(UTC))
.subject("_subject")
.textBody("_text_body")
.build();

View File

@ -7,7 +7,6 @@ package org.elasticsearch.watcher.execution;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.actions.Action;
@ -32,6 +31,7 @@ import org.junit.Test;
import java.util.Arrays;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.hamcrest.Matchers.*;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.same;
@ -93,7 +93,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
when(watch.actions()).thenReturn(actions);
when(watch.status()).thenReturn(watchStatus);
DateTime now = DateTime.now(DateTimeZone.UTC);
DateTime now = DateTime.now(UTC);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event);
@ -139,7 +139,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
when(watch.actions()).thenReturn(actions);
when(watch.status()).thenReturn(watchStatus);
DateTime now = DateTime.now(DateTimeZone.UTC);
DateTime now = DateTime.now(UTC);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event);
@ -185,7 +185,7 @@ public class ExecutionServiceTests extends ElasticsearchTestCase {
when(watch.actions()).thenReturn(actions);
when(watch.status()).thenReturn(watchStatus);
DateTime now = DateTime.now(DateTimeZone.UTC);
DateTime now = DateTime.now(UTC);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_id", now, now);
WatchExecutionContext context = new TriggeredExecutionContext(watch, now, event);

View File

@ -8,7 +8,6 @@ package org.elasticsearch.watcher.history;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.cluster.ClusterService;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.watcher.condition.ExecutableCondition;
import org.elasticsearch.watcher.condition.always.ExecutableAlwaysCondition;
import org.elasticsearch.watcher.execution.Wid;
@ -36,7 +35,7 @@ public class HistoryStoreLifeCycleTest extends AbstractWatcherIntegrationTests {
// Put watch records and verify that these are stored
WatchRecord[] watchRecords = new WatchRecord[randomIntBetween(1, 50)];
for (int i = 0; i < watchRecords.length; i++) {
DateTime dateTime = new DateTime(i, DateTimeZone.UTC);
DateTime dateTime = new DateTime(i, UTC);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), dateTime, dateTime);
Wid wid = new Wid("record_" + i, randomLong(), DateTime.now(UTC));
watchRecords[i] = new WatchRecord(wid, watch, event);

View File

@ -20,7 +20,6 @@ import org.elasticsearch.cluster.routing.*;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.text.StringText;
@ -43,6 +42,7 @@ import org.junit.Test;
import java.util.Collection;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.watcher.test.WatcherMatchers.indexRequest;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.notNullValue;
@ -76,8 +76,8 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
when(watch.condition()).thenReturn(new ExecutableAlwaysCondition(logger));
when(watch.input()).thenReturn(null);
when(watch.metadata()).thenReturn(null);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC));
Wid wid = new Wid("_name", 0, new DateTime(0, UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);
IndexResponse indexResponse = mock(IndexResponse.class);
@ -96,8 +96,8 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
when(watch.condition()).thenReturn(new ExecutableAlwaysCondition(logger));
when(watch.input()).thenReturn(null);
when(watch.metadata()).thenReturn(null);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC));
Wid wid = new Wid("_name", 0, new DateTime(0, UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);
watchRecord.version(4l);
@ -117,8 +117,8 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
when(watch.condition()).thenReturn(new ExecutableAlwaysCondition(logger));
when(watch.input()).thenReturn(null);
when(watch.metadata()).thenReturn(null);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC));
Wid wid = new Wid("_name", 0, new DateTime(0, UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);
historyStore.stop();
@ -137,8 +137,8 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
when(watch.condition()).thenReturn(new ExecutableAlwaysCondition(logger));
when(watch.input()).thenReturn(null);
when(watch.metadata()).thenReturn(null);
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC));
Wid wid = new Wid("_name", 0, new DateTime(0, DateTimeZone.UTC));
ScheduleTriggerEvent event = new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC));
Wid wid = new Wid("_name", 0, new DateTime(0, UTC));
WatchRecord watchRecord = new WatchRecord(wid, watch, event);
historyStore.stop();
@ -373,10 +373,10 @@ public class HistoryStoreTests extends ElasticsearchTestCase {
@Test
public void testIndexNameGeneration() {
assertThat(HistoryStore.getHistoryIndexNameForTime(new DateTime(0, DateTimeZone.UTC)), equalTo(".watch_history_1970-01-01"));
assertThat(HistoryStore.getHistoryIndexNameForTime(new DateTime(100000000000L, DateTimeZone.UTC)), equalTo(".watch_history_1973-03-03"));
assertThat(HistoryStore.getHistoryIndexNameForTime(new DateTime(1416582852000L, DateTimeZone.UTC)), equalTo(".watch_history_2014-11-21"));
assertThat(HistoryStore.getHistoryIndexNameForTime(new DateTime(2833165811000L, DateTimeZone.UTC)), equalTo(".watch_history_2059-10-12"));
assertThat(HistoryStore.getHistoryIndexNameForTime(new DateTime(0, UTC)), equalTo(".watch_history_1970-01-01"));
assertThat(HistoryStore.getHistoryIndexNameForTime(new DateTime(100000000000L, UTC)), equalTo(".watch_history_1973-03-03"));
assertThat(HistoryStore.getHistoryIndexNameForTime(new DateTime(1416582852000L, UTC)), equalTo(".watch_history_2014-11-21"));
assertThat(HistoryStore.getHistoryIndexNameForTime(new DateTime(2833165811000L, UTC)), equalTo(".watch_history_2059-10-12"));
}
private RefreshResponse mockRefreshResponse(int total, int successful) {

View File

@ -9,7 +9,6 @@ import org.elasticsearch.ElasticsearchIllegalArgumentException;
import org.elasticsearch.common.collect.ImmutableMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentHelper;
@ -42,6 +41,7 @@ import org.junit.Test;
import java.util.ArrayList;
import java.util.Map;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.Matchers.*;
import static org.mockito.Matchers.any;
@ -92,8 +92,8 @@ public class HttpInputTests extends ElasticsearchTestCase {
null,
new Watch.Status());
WatchExecutionContext ctx = new TriggeredExecutionContext(watch,
new DateTime(0, DateTimeZone.UTC),
new ScheduleTriggerEvent(watch.id(), new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)));
new DateTime(0, UTC),
new ScheduleTriggerEvent(watch.id(), new DateTime(0, UTC), new DateTime(0, UTC)));
HttpInput.Result result = input.execute(ctx);
assertThat(result.type(), equalTo(HttpInput.TYPE));
assertThat(result.payload().data(), equalTo(MapBuilder.<String, Object>newMapBuilder().put("key", "value").map()));

View File

@ -8,7 +8,6 @@ package org.elasticsearch.watcher.input.search;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.xcontent.ToXContent;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -42,6 +41,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.rangeFilter;
import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
@ -80,8 +80,8 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
null,
null,
new Watch.Status()),
new DateTime(0, DateTimeZone.UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)));
new DateTime(0, UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)));
SearchInput.Result result = searchInput.execute(ctx);
assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));
@ -118,8 +118,8 @@ public class SearchInputTests extends ElasticsearchIntegrationTest {
null,
null,
new Watch.Status()),
new DateTime(0, DateTimeZone.UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(0, DateTimeZone.UTC), new DateTime(0, DateTimeZone.UTC)));
new DateTime(0, UTC),
new ScheduleTriggerEvent("test-watch", new DateTime(0, UTC), new DateTime(0, UTC)));
SearchInput.Result result = searchInput.execute(ctx);
assertThat((Integer) XContentMapValues.extractValue("hits.total", result.payload().data()), equalTo(0));

View File

@ -6,6 +6,7 @@
package org.elasticsearch.watcher.support.clock;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.joda.time.Duration;
import org.elasticsearch.common.unit.TimeValue;
@ -33,6 +34,11 @@ public class ClockMock implements Clock {
return now;
}
@Override
public DateTime now(DateTimeZone timeZone) {
return now.toDateTime(timeZone);
}
@Override
public TimeValue timeElapsedSince(DateTime time) {
return TimeValue.timeValueMillis(new Duration(time, now).getMillis());

View File

@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.clock;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.hamcrest.Matchers.equalTo;
/**
*/
public class ClockTests extends ElasticsearchTestCase {
@Test
public void test_now_UTC() {
Clock clockMock = new ClockMock();
assertThat(clockMock.now(UTC).getZone(), equalTo(UTC));
assertThat(SystemClock.INSTANCE.now(UTC).getZone(), equalTo(UTC));
}
}

View File

@ -11,7 +11,6 @@ import org.elasticsearch.action.get.GetRequest;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.watcher.client.WatcherClient;
import org.elasticsearch.watcher.history.HistoryStore;
@ -28,6 +27,7 @@ import org.junit.Test;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery;
import static org.elasticsearch.index.query.QueryBuilders.matchQuery;
import static org.elasticsearch.watcher.actions.ActionBuilders.indexAction;
@ -142,7 +142,7 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
assertThat(putWatchResponse.isCreated(), is(true));
if (timeWarped()) {
timeWarp().clock().setTime(DateTime.now(DateTimeZone.UTC));
timeWarp().clock().setTime(DateTime.now(UTC));
timeWarp().scheduler().trigger("_name");
refresh();
@ -283,7 +283,7 @@ public class WatchThrottleTests extends AbstractWatcherIntegrationTests {
assertThat(putWatchResponse.isCreated(), is(true));
if (timeWarped()) {
timeWarp().clock().setTime(DateTime.now(DateTimeZone.UTC));
timeWarp().clock().setTime(DateTime.now(UTC));
timeWarp().scheduler().trigger("_name");
refresh();

View File

@ -15,6 +15,7 @@ import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.elasticsearch.watcher.support.init.proxy.ClientProxy;
import org.elasticsearch.watcher.test.AbstractWatcherIntegrationTests;
import org.elasticsearch.watcher.transform.search.ExecutableSearchTransform;
@ -22,12 +23,12 @@ import org.elasticsearch.watcher.transform.search.SearchTransform;
import org.elasticsearch.watcher.transform.search.SearchTransformFactory;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import org.elasticsearch.watcher.watch.Payload;
import org.elasticsearch.watcher.execution.WatchExecutionContext;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.elasticsearch.index.query.FilterBuilders.*;
import static org.elasticsearch.index.query.QueryBuilders.filteredQuery;
@ -113,8 +114,8 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
ExecutableSearchTransform transform = new ExecutableSearchTransform(new SearchTransform(request), logger, scriptService(), ClientProxy.of(client()));
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_name", parseDate("2015-01-04T00:00:00"), parseDate("2015-01-01T00:00:00"));
WatchExecutionContext ctx = mockExecutionContext("_name", parseDate("2015-01-04T00:00:00"), event, EMPTY_PAYLOAD);
ScheduleTriggerEvent event = new ScheduleTriggerEvent("_name", parseDate("2015-01-04T00:00:00", UTC), parseDate("2015-01-01T00:00:00", UTC));
WatchExecutionContext ctx = mockExecutionContext("_name", parseDate("2015-01-04T00:00:00", UTC), event, EMPTY_PAYLOAD);
Payload payload = simplePayload("value", "val_3");
@ -199,7 +200,7 @@ public class SearchTransformTests extends AbstractWatcherIntegrationTests {
private static Map<String, Object> doc(String date, String value) {
Map<String, Object> doc = new HashMap<>();
doc.put("date", parseDate(date));
doc.put("date", parseDate(date, UTC));
doc.put("value", value);
return doc;
}

View File

@ -15,9 +15,9 @@ import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.watcher.support.clock.Clock;
import org.elasticsearch.watcher.support.clock.ClockMock;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEngine;
import org.elasticsearch.watcher.trigger.schedule.ScheduleRegistry;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTrigger;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEngine;
import org.elasticsearch.watcher.trigger.schedule.ScheduleTriggerEvent;
import java.io.IOException;
@ -25,6 +25,7 @@ import java.util.Collection;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
/**
* A mock scheduler to help with unit testing. Provide {@link ScheduleTriggerEngineMock#trigger} method to manually trigger
* jobs.
@ -80,7 +81,7 @@ public class ScheduleTriggerEngineMock extends ScheduleTriggerEngine {
public void trigger(String jobName, int times, TimeValue interval) {
for (int i = 0; i < times; i++) {
DateTime now = clock.now();
DateTime now = clock.now(UTC);
logger.debug("firing [" + jobName + "] at [" + now + "]");
ScheduleTriggerEvent event = new ScheduleTriggerEvent(jobName, now, now);
for (Listener listener : listeners) {

View File

@ -7,7 +7,6 @@ package org.elasticsearch.watcher.trigger.schedule.engine;
import org.apache.lucene.util.LuceneTestCase.Slow;
import org.elasticsearch.common.joda.time.DateTime;
import org.elasticsearch.common.joda.time.DateTimeZone;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.elasticsearch.watcher.trigger.Trigger;
import org.elasticsearch.watcher.trigger.TriggerEngine;
@ -28,6 +27,7 @@ import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.common.joda.time.DateTimeZone.UTC;
import static org.elasticsearch.watcher.trigger.schedule.Schedules.*;
import static org.hamcrest.Matchers.is;
@ -101,7 +101,7 @@ public abstract class BaseTriggerEngineTests extends ElasticsearchTestCase {
latch.countDown();
}
});
DateTime now = new DateTime(DateTimeZone.UTC);
DateTime now = new DateTime(UTC);
Minute minOfHour = new Minute(now);
if (now.getSecondOfMinute() < 58) {
minOfHour.inc(1);
@ -135,7 +135,7 @@ public abstract class BaseTriggerEngineTests extends ElasticsearchTestCase {
}
}
});
DateTime now = new DateTime(DateTimeZone.UTC);
DateTime now = new DateTime(UTC);
Minute minOfHour = new Minute(now);
Hour hourOfDay = new Hour(now);
boolean jumpedHour = now.getSecondOfMinute() < 29 ? minOfHour.inc(1) : minOfHour.inc(2);
@ -171,7 +171,7 @@ public abstract class BaseTriggerEngineTests extends ElasticsearchTestCase {
latch.countDown();
}
});
DateTime now = new DateTime(DateTimeZone.UTC);
DateTime now = new DateTime(UTC);
Minute minOfHour = new Minute(now);
Hour hourOfDay = new Hour(now);
Day dayOfWeek = new Day(now);