Remove `InternalTestCluster.startNode(s)Async` (elastic/elasticsearch#4198)

A companion PR to https://github.com/elastic/elasticsearch/pull/21846 where the above methods were removed. See ES PR for details.

With the concurrent starting the issues with licenses and time freeze became more apparent and I had to apply my suggestion to only freeze time once the license has been applied (as opposed to freeze on node start up). Since this also means that a node that starts up after the cluster time has been frozen need to also immediately freeze, it felt natural to use a `ServiceDisruptionScheme`. Although the name doesn't really make sense here, it all has all the logic we need. 

Original commit: elastic/x-pack-elasticsearch@5641742f60
This commit is contained in:
Boaz Leskes 2016-12-06 12:07:28 +01:00 committed by GitHub
parent 7a996da86d
commit affdf10274
12 changed files with 162 additions and 74 deletions

View File

@ -41,7 +41,7 @@ public abstract class AbstractLicenseServiceTestCase extends ESTestCase {
@Before
public void init() throws Exception {
clusterService = mock(ClusterService.class);
clock = new ClockMock();
clock = ClockMock.frozen();
discoveryNodes = mock(DiscoveryNodes.class);
resourceWatcherService = mock(ResourceWatcherService.class);
environment = mock(Environment.class);

View File

@ -13,12 +13,10 @@ import org.elasticsearch.search.aggregations.AggregationBuilders;
import org.elasticsearch.search.aggregations.bucket.terms.StringTerms;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.ESIntegTestCase.Scope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.xpack.monitoring.MonitoringSettings;
import org.elasticsearch.xpack.monitoring.test.MonitoringIntegTestCase;
import org.junit.After;
import java.util.List;
import java.util.concurrent.TimeUnit;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertNoTimeout;
@ -49,22 +47,19 @@ public class MultiNodesStatsTests extends MonitoringIntegTestCase {
int n = randomIntBetween(1, 2);
logger.debug("--> starting {} master only nodes", n);
InternalTestCluster.Async<List<String>> masterNodes = internalCluster().startMasterOnlyNodesAsync(n);
masterNodes.get();
internalCluster().startMasterOnlyNodes(n);
nodes += n;
n = randomIntBetween(2, 3);
logger.debug("--> starting {} data only nodes", n);
InternalTestCluster.Async<List<String>> dataNodes = internalCluster().startDataOnlyNodesAsync(n);
dataNodes.get();
internalCluster().startDataOnlyNodes(n);
nodes += n;
n = randomIntBetween(1, 2);
logger.debug("--> starting {} client only nodes", n);
InternalTestCluster.Async<List<String>> clientNodes = internalCluster().startNodesAsync(n,
internalCluster().startNodes(n,
Settings.builder().put(Node.NODE_DATA_SETTING.getKey(), false).put(Node.NODE_MASTER_SETTING.getKey(), false)
.put(Node.NODE_INGEST_SETTING.getKey(), false).build());
clientNodes.get();
nodes += n;
n = randomIntBetween(1, 2);

View File

@ -7,73 +7,95 @@ package org.elasticsearch.xpack.support.clock;
import org.elasticsearch.common.unit.TimeValue;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.time.ZoneOffset;
/**
* A clock that can be modified for testing.
*/
public class ClockMock extends Clock {
private final ZoneId zoneId;
private DateTime now;
private volatile Clock wrappedClock;
public ClockMock() {
zoneId = ZoneOffset.UTC;
now = DateTime.now(DateTimeZone.UTC);
this(Clock.systemUTC());
}
private ClockMock(ZoneId zoneId) {
this.zoneId = zoneId;
now = DateTime.now(DateTimeZone.forID(zoneId.getId()));
/**
* a utility method to create a new {@link ClockMock} and immediately call its {@link #freeze()} method
*/
public static ClockMock frozen() {
return new ClockMock().freeze();
}
private ClockMock(Clock wrappedClock) {
this.wrappedClock = wrappedClock;
}
@Override
public ZoneId getZone() {
return ZoneOffset.UTC;
return wrappedClock.getZone();
}
@Override
public Clock withZone(ZoneId zoneId) {
if (zoneId.equals(this.zoneId)) {
public synchronized Clock withZone(ZoneId zoneId) {
if (zoneId.equals(wrappedClock.getZone())) {
return this;
}
return new ClockMock(zoneId);
return new ClockMock(wrappedClock.withZone(zoneId));
}
@Override
public long millis() {
return now.getMillis();
return wrappedClock.millis();
}
@Override
public Instant instant() {
return Instant.ofEpochMilli(now.getMillis());
return wrappedClock.instant();
}
public ClockMock setTime(DateTime now) {
this.now = now;
public synchronized void setTime(DateTime now) {
setTime(Instant.ofEpochMilli(now.getMillis()));
}
private void setTime(Instant now) {
assert Thread.holdsLock(this);
this.wrappedClock = Clock.fixed(now, getZone());
}
/** freeze the time for this clock, preventing it from advancing */
public synchronized ClockMock freeze() {
setTime(instant());
return this;
}
public ClockMock fastForward(TimeValue timeValue) {
return setTime(now.plusMillis((int) timeValue.millis()));
/** the clock will be reset to current time and will advance from now */
public synchronized ClockMock unfreeze() {
wrappedClock = Clock.system(getZone());
return this;
}
public ClockMock fastForwardSeconds(int seconds) {
return fastForward(TimeValue.timeValueSeconds(seconds));
/** freeze the clock if not frozen and advance it by the given time */
public synchronized void fastForward(TimeValue timeValue) {
setTime(instant().plusMillis(timeValue.getMillis()));
}
public ClockMock rewind(TimeValue timeValue) {
return setTime(now.minusMillis((int) timeValue.millis()));
/** freeze the clock if not frozen and advance it by the given amount of seconds */
public void fastForwardSeconds(int seconds) {
fastForward(TimeValue.timeValueSeconds(seconds));
}
public ClockMock rewindSeconds(int seconds) {
return rewind(TimeValue.timeValueSeconds(seconds));
/** freeze the clock if not frozen and rewind it by the given time */
public synchronized void rewind(TimeValue timeValue) {
setTime(instant().minusMillis((int) timeValue.millis()));
}
/** freeze the clock if not frozen and rewind it by the given number of seconds */
public void rewindSeconds(int seconds) {
rewind(TimeValue.timeValueSeconds(seconds));
}
}

View File

@ -11,12 +11,23 @@ import org.joda.time.DateTime;
import java.time.Clock;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.joda.time.DateTimeZone.UTC;
public class ClockTests extends ESTestCase {
public void testNowUTC() {
Clock clockMock = new ClockMock();
Clock clockMock = ClockMock.frozen();
assertThat(new DateTime(clockMock.millis(), UTC).getZone(), equalTo(UTC));
assertThat(new DateTime(Clock.systemUTC().millis(), UTC).getZone(), equalTo(UTC));
}
public void testFreezeUnfreeze() throws Exception {
ClockMock clockMock = ClockMock.frozen();
final long millis = clockMock.millis();
for (int i = 0; i < 10; i++) {
assertThat(clockMock.millis(), equalTo(millis));
}
clockMock.unfreeze();
assertBusy(() -> assertThat(clockMock.millis(), greaterThan(millis)));
}
}

View File

@ -58,7 +58,7 @@ public class WatcherServiceTests extends ESTestCase {
watchParser = mock(Watch.Parser.class);
ExecutionService executionService = mock(ExecutionService.class);
WatchLockService watchLockService = mock(WatchLockService.class);
clock = new ClockMock();
clock = ClockMock.frozen();
WatcherIndexTemplateRegistry watcherIndexTemplateRegistry = mock(WatcherIndexTemplateRegistry.class);
watcherService = new WatcherService(Settings.EMPTY, clock, triggerService, watchStore, watchParser, executionService,
watchLockService, watcherIndexTemplateRegistry);

View File

@ -144,7 +144,7 @@ public class ArrayCompareConditionTests extends ESTestCase {
}
public void testExecuteDateMath() {
ClockMock clock = new ClockMock();
ClockMock clock = ClockMock.frozen();
boolean met = randomBoolean();
ArrayCompareCondition.Op op = met ?
randomFrom(ArrayCompareCondition.Op.GT, ArrayCompareCondition.Op.GTE, ArrayCompareCondition.Op.NOT_EQ) :
@ -183,7 +183,7 @@ public class ArrayCompareConditionTests extends ESTestCase {
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
ArrayCompareCondition condition = (ArrayCompareCondition) ArrayCompareCondition.parse(new ClockMock(), "_id", parser);
ArrayCompareCondition condition = (ArrayCompareCondition) ArrayCompareCondition.parse(ClockMock.frozen(), "_id", parser);
assertThat(condition, notNullValue());
assertThat(condition.getArrayPath(), is("key1.key2"));
@ -218,7 +218,7 @@ public class ArrayCompareConditionTests extends ESTestCase {
expectedException.expect(ElasticsearchParseException.class);
expectedException.expectMessage("duplicate comparison operator");
ArrayCompareCondition.parse(new ClockMock(), "_id", parser);
ArrayCompareCondition.parse(ClockMock.frozen(), "_id", parser);
}
public void testParseContainsUnknownOperator() throws IOException {
@ -241,7 +241,7 @@ public class ArrayCompareConditionTests extends ESTestCase {
expectedException.expect(ElasticsearchParseException.class);
expectedException.expectMessage("unknown comparison operator");
ArrayCompareCondition.parse(new ClockMock(), "_id", parser);
ArrayCompareCondition.parse(ClockMock.frozen(), "_id", parser);
}
public void testParseContainsDuplicateValue() throws IOException {
@ -266,7 +266,7 @@ public class ArrayCompareConditionTests extends ESTestCase {
expectedException.expect(ElasticsearchParseException.class);
expectedException.expectMessage("duplicate field \"value\"");
ArrayCompareCondition.parse(new ClockMock(), "_id", parser);
ArrayCompareCondition.parse(ClockMock.frozen(), "_id", parser);
}
public void testParseContainsDuplicateQuantifier() throws IOException {
@ -291,7 +291,7 @@ public class ArrayCompareConditionTests extends ESTestCase {
expectedException.expect(ElasticsearchParseException.class);
expectedException.expectMessage("duplicate field \"quantifier\"");
ArrayCompareCondition.parse(new ClockMock(), "_id", parser);
ArrayCompareCondition.parse(ClockMock.frozen(), "_id", parser);
}
public void testParseContainsUnknownQuantifier() throws IOException {
@ -314,7 +314,7 @@ public class ArrayCompareConditionTests extends ESTestCase {
expectedException.expect(ElasticsearchParseException.class);
expectedException.expectMessage("unknown comparison quantifier");
ArrayCompareCondition.parse(new ClockMock(), "_id", parser);
ArrayCompareCondition.parse(ClockMock.frozen(), "_id", parser);
}
public void testParseContainsUnexpectedFieldInComparisonOperator() throws IOException {
@ -339,6 +339,6 @@ public class ArrayCompareConditionTests extends ESTestCase {
expectedException.expect(ElasticsearchParseException.class);
expectedException.expectMessage("expected a field indicating the comparison value or comparison quantifier");
ArrayCompareCondition.parse(new ClockMock(), "_id", parser);
ArrayCompareCondition.parse(ClockMock.frozen(), "_id", parser);
}
}

View File

@ -134,7 +134,7 @@ public class CompareConditionTests extends ESTestCase {
}
public void testExecuteDateMath() throws Exception {
ClockMock clock = new ClockMock();
ClockMock clock = ClockMock.frozen();
boolean met = randomBoolean();
Op op = met ? randomFrom(CompareCondition.Op.GT, CompareCondition.Op.GTE, CompareCondition.Op.NOT_EQ) :
randomFrom(CompareCondition.Op.LT, CompareCondition.Op.LTE, CompareCondition.Op.EQ);
@ -147,7 +147,7 @@ public class CompareConditionTests extends ESTestCase {
}
public void testExecutePath() throws Exception {
ClockMock clock = new ClockMock();
ClockMock clock = ClockMock.frozen();
boolean met = randomBoolean();
Op op = met ? CompareCondition.Op.EQ : CompareCondition.Op.NOT_EQ;
String value = "{{ctx.payload.value}}";
@ -171,7 +171,7 @@ public class CompareConditionTests extends ESTestCase {
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
CompareCondition condition = (CompareCondition) CompareCondition.parse(new ClockMock(), "_id", parser);
CompareCondition condition = (CompareCondition) CompareCondition.parse(ClockMock.frozen(), "_id", parser);
assertThat(condition, notNullValue());
assertThat(condition.getPath(), is("key1.key2"));
@ -188,7 +188,7 @@ public class CompareConditionTests extends ESTestCase {
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
try {
CompareCondition.parse(new ClockMock(), "_id", parser);
CompareCondition.parse(ClockMock.frozen(), "_id", parser);
fail("Expected ElasticsearchParseException");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), containsString("expected an object but found [null] instead"));
@ -207,7 +207,7 @@ public class CompareConditionTests extends ESTestCase {
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
try {
CompareCondition.parse(new ClockMock(), "_id", parser);
CompareCondition.parse(ClockMock.frozen(), "_id", parser);
fail("Expected ElasticsearchParseException");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), containsString("unknown comparison operator [foobar]"));
@ -227,7 +227,7 @@ public class CompareConditionTests extends ESTestCase {
XContentParser parser = JsonXContent.jsonXContent.createParser(builder.bytes());
parser.nextToken();
try {
CompareCondition.parse(new ClockMock(), "_id", parser);
CompareCondition.parse(ClockMock.frozen(), "_id", parser);
fail("Expected ElasticsearchParseException");
} catch (ElasticsearchParseException e) {
assertThat(e.getMessage(), containsString("must either be a numeric, string, boolean or null value, but found ["));

View File

@ -98,7 +98,7 @@ public class ExecutionServiceTests extends ESTestCase {
when(executor.queue()).thenReturn(new ArrayBlockingQueue<>(1));
watchLockService = mock(WatchLockService.class);
clock = new ClockMock();
clock = ClockMock.frozen();
threadPool = mock(ThreadPool.class);
executionService = new ExecutionService(Settings.EMPTY, historyStore, triggeredWatchStore, executor, watchStore,
watchLockService, clock, threadPool);

View File

@ -41,6 +41,7 @@ import org.elasticsearch.test.ESIntegTestCase;
import org.elasticsearch.test.ESIntegTestCase.ClusterScope;
import org.elasticsearch.test.InternalTestCluster;
import org.elasticsearch.test.TestCluster;
import org.elasticsearch.test.disruption.ServiceDisruptionScheme;
import org.elasticsearch.test.store.MockFSIndexStore;
import org.elasticsearch.test.transport.MockTransportService;
import org.elasticsearch.xpack.TimeWarpedXPackPlugin;
@ -281,6 +282,13 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
private void startWatcherIfNodesExist() throws Exception {
if (internalCluster().size() > 0) {
ensureLicenseEnabled();
if (timeWarped()) {
// now that the license is enabled and valid we can freeze all nodes clocks
logger.info("[{}#{}]: freezing time on nodes", getTestClass().getSimpleName(), getTestName());
TimeFreezeDisruption ice = new TimeFreezeDisruption();
internalCluster().setDisruptionScheme(ice);
ice.startDisrupting();
}
WatcherState state = getInstanceFromMaster(WatcherService.class).state();
if (state == WatcherState.STOPPED) {
logger.info("[{}#{}]: starting watcher", getTestClass().getSimpleName(), getTestName());
@ -585,12 +593,6 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
}
protected void ensureLicenseEnabled() throws Exception {
if (timeWarped()) {
// the master generates a license which starts now. We have to make sure all nodes
// advance their time so that the license will be valid
progressClocksAboveMaster(internalCluster());
}
assertBusy(() -> {
for (XPackLicenseState licenseState : internalCluster().getInstances(XPackLicenseState.class)) {
assertThat(licenseState.isWatcherAllowed(), is(true));
@ -797,4 +799,70 @@ public abstract class AbstractWatcherIntegrationTestCase extends ESIntegTestCase
}
}
/**
* A disruption that prevents time from advancing on nodes. This is needed to allow time sensitive tests
* to have full control of time. This disruption requires {@link ClockMock} being available on the nodes.
*/
private static class TimeFreezeDisruption implements ServiceDisruptionScheme {
private InternalTestCluster cluster;
private boolean frozen;
@Override
public void applyToCluster(InternalTestCluster cluster) {
this.cluster = cluster;
}
@Override
public void removeFromCluster(InternalTestCluster cluster) {
stopDisrupting();
}
@Override
public void removeAndEnsureHealthy(InternalTestCluster cluster) {
stopDisrupting();
}
@Override
public synchronized void applyToNode(String node, InternalTestCluster cluster) {
if (frozen) {
((ClockMock)cluster.getInstance(Clock.class, node)).freeze();
}
}
@Override
public void removeFromNode(String node, InternalTestCluster cluster) {
((ClockMock)cluster.getInstance(Clock.class, node)).unfreeze();
}
@Override
public synchronized void startDisrupting() {
frozen = true;
for (String node: cluster.getNodeNames()) {
applyToNode(node, cluster);
}
}
@Override
public void stopDisrupting() {
frozen = false;
for (String node: cluster.getNodeNames()) {
removeFromNode(node, cluster);
}
}
@Override
public void testClusterClosed() {
}
@Override
public TimeValue expectedTimeToHeal() {
return TimeValue.ZERO;
}
@Override
public String toString() {
return "time frozen";
}
}
}

View File

@ -83,7 +83,7 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTestCase {
public void testSimpleFailure() throws Exception {
// we need 3 hosts here because we stop the master and start another - it doesn't restart the pre-existing node...
config = new ClusterDiscoveryConfiguration.UnicastZen(3, Settings.EMPTY);
internalCluster().startNodesAsync(2).get();
internalCluster().startNodes(2);
createIndex("my-index");
ensureWatcherStarted(false);
@ -135,16 +135,8 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTestCase {
public void testDedicatedMasterNodeLayout() throws Exception {
// Only the master nodes are in the unicast nodes list:
config = new ClusterDiscoveryConfiguration.UnicastZen(11, 3, Settings.EMPTY);
Settings settings = Settings.builder()
.put("node.data", false)
.put("node.master", true)
.build();
internalCluster().startNodesAsync(3, settings).get();
settings = Settings.builder()
.put("node.data", true)
.put("node.master", false)
.build();
internalCluster().startNodesAsync(7, settings).get();
internalCluster().startMasterOnlyNodes(3);
internalCluster().startDataOnlyNodes(7);
ensureWatcherStarted(false);
ensureLicenseEnabled();
@ -189,7 +181,7 @@ public class NoMasterNodeTests extends AbstractWatcherIntegrationTestCase {
int numberOfWatches = scaledRandomIntBetween(numberOfFailures, 12);
logger.info("number of failures [{}], number of watches [{}]", numberOfFailures, numberOfWatches);
config = new ClusterDiscoveryConfiguration.UnicastZen(2 + numberOfFailures, Settings.EMPTY);
internalCluster().startNodesAsync(2).get();
internalCluster().startNodes(2);
createIndex("my-index");
client().prepareIndex("my-index", "my-type").setSource("field", "value").get();

View File

@ -34,7 +34,7 @@ import static org.joda.time.DateTimeZone.UTC;
public abstract class BaseTriggerEngineTestCase extends ESTestCase {
private TriggerEngine engine;
protected ClockMock clock = new ClockMock();
protected ClockMock clock = ClockMock.frozen();
@Before
public void init() throws Exception {

View File

@ -166,7 +166,7 @@ public class WatchTests extends ESTestCase {
public void testParserSelfGenerated() throws Exception {
DateTime now = new DateTime(UTC);
ClockMock clock = new ClockMock();
ClockMock clock = ClockMock.frozen();
clock.setTime(now);
TransformRegistry transformRegistry = transformRegistry();
boolean includeStatus = randomBoolean();
@ -219,7 +219,7 @@ public class WatchTests extends ESTestCase {
}
public void testParserBadActions() throws Exception {
ClockMock clock = new ClockMock();
ClockMock clock = ClockMock.frozen();
ScheduleRegistry scheduleRegistry = registry(randomSchedule());
TriggerEngine triggerEngine = new ParseOnlyScheduleTriggerEngine(Settings.EMPTY, scheduleRegistry, clock);
TriggerService triggerService = new TriggerService(Settings.EMPTY, singleton(triggerEngine));
@ -434,7 +434,7 @@ public class WatchTests extends ESTestCase {
parsers.put(ArrayCompareCondition.TYPE, (c, id, p) -> ArrayCompareCondition.parse(c, id, p));
parsers.put(CompareCondition.TYPE, (c, id, p) -> CompareCondition.parse(c, id, p));
parsers.put(ScriptCondition.TYPE, (c, id, p) -> ScriptCondition.parse(scriptService, id, p));
return new ConditionRegistry(parsers, new ClockMock());
return new ConditionRegistry(parsers, ClockMock.frozen());
}
private ExecutableTransform randomTransform() {