Refactor Sniffer and make it testable (#29638)
This commit reworks the Sniffer component to simplify it and make it possible to test it. In particular, it no longer takes out the host that failed when sniffing on failure, but rather relies on whatever the cluster returns. This is the result of some valid comments from #27985. Taking out one single host is too naive, hard to test and debug. A new Scheduler abstraction is introduced to abstract the tasks scheduling away and make it possible to plug in any test implementation and take out timing aspects when testing. Concurrency aspects have also been improved, synchronized methods are no longer required. At the same time, we were able to take #27697 and #25701 into account and fix them, especially now that we can more easily add tests. Last but not least, unit tests are added for the Sniffer component, long overdue. Closes #27697 Closes #25701
This commit is contained in:
parent
4777d8a2df
commit
63f3a61134
|
@ -61,6 +61,7 @@ import java.util.Comparator;
|
|||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
|
@ -132,7 +133,7 @@ public class RestClient implements Closeable {
|
|||
if (hosts == null || hosts.length == 0) {
|
||||
throw new IllegalArgumentException("hosts must not be null nor empty");
|
||||
}
|
||||
Set<HttpHost> httpHosts = new HashSet<>();
|
||||
Set<HttpHost> httpHosts = new LinkedHashSet<>();
|
||||
AuthCache authCache = new BasicAuthCache();
|
||||
for (HttpHost host : hosts) {
|
||||
Objects.requireNonNull(host, "host cannot be null");
|
||||
|
@ -143,6 +144,13 @@ public class RestClient implements Closeable {
|
|||
this.blacklist.clear();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the configured hosts
|
||||
*/
|
||||
public List<HttpHost> getHosts() {
|
||||
return new ArrayList<>(hostTuple.hosts);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a request to the Elasticsearch cluster that the client points to.
|
||||
* Blocks until the request is completed and returns its response or fails
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
|||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
@ -251,6 +252,37 @@ public class RestClientTests extends RestClientTestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void testSetHostsPreservesOrdering() throws Exception {
|
||||
try (RestClient restClient = createRestClient()) {
|
||||
HttpHost[] hosts = randomHosts();
|
||||
restClient.setHosts(hosts);
|
||||
assertEquals(Arrays.asList(hosts), restClient.getHosts());
|
||||
}
|
||||
}
|
||||
|
||||
private static HttpHost[] randomHosts() {
|
||||
int numHosts = randomIntBetween(1, 10);
|
||||
HttpHost[] hosts = new HttpHost[numHosts];
|
||||
for (int i = 0; i < hosts.length; i++) {
|
||||
hosts[i] = new HttpHost("host-" + i, 9200);
|
||||
}
|
||||
return hosts;
|
||||
}
|
||||
|
||||
public void testSetHostsDuplicatedHosts() throws Exception {
|
||||
try (RestClient restClient = createRestClient()) {
|
||||
int numHosts = randomIntBetween(1, 10);
|
||||
HttpHost[] hosts = new HttpHost[numHosts];
|
||||
HttpHost host = new HttpHost("host", 9200);
|
||||
for (int i = 0; i < hosts.length; i++) {
|
||||
hosts[i] = host;
|
||||
}
|
||||
restClient.setHosts(hosts);
|
||||
assertEquals(1, restClient.getHosts().size());
|
||||
assertEquals(host, restClient.getHosts().get(0));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @deprecated will remove method in 7.0 but needs tests until then. Replaced by {@link RequestTests#testConstructor()}.
|
||||
*/
|
||||
|
|
|
@ -58,7 +58,6 @@ public class SniffOnFailureListener extends RestClient.FailureListener {
|
|||
if (sniffer == null) {
|
||||
throw new IllegalStateException("sniffer was not set, unable to sniff on failure");
|
||||
}
|
||||
//re-sniff immediately but take out the node that failed
|
||||
sniffer.sniffOnFailure(host);
|
||||
sniffer.sniffOnFailure();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,12 +31,14 @@ import java.security.AccessController;
|
|||
import java.security.PrivilegedAction;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.ThreadFactory;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/**
|
||||
* Class responsible for sniffing nodes from some source (default is elasticsearch itself) and setting them to a provided instance of
|
||||
|
@ -51,101 +53,175 @@ public class Sniffer implements Closeable {
|
|||
private static final Log logger = LogFactory.getLog(Sniffer.class);
|
||||
private static final String SNIFFER_THREAD_NAME = "es_rest_client_sniffer";
|
||||
|
||||
private final Task task;
|
||||
private final HostsSniffer hostsSniffer;
|
||||
private final RestClient restClient;
|
||||
private final long sniffIntervalMillis;
|
||||
private final long sniffAfterFailureDelayMillis;
|
||||
private final Scheduler scheduler;
|
||||
private final AtomicBoolean initialized = new AtomicBoolean(false);
|
||||
private volatile ScheduledTask nextScheduledTask;
|
||||
|
||||
Sniffer(RestClient restClient, HostsSniffer hostsSniffer, long sniffInterval, long sniffAfterFailureDelay) {
|
||||
this.task = new Task(hostsSniffer, restClient, sniffInterval, sniffAfterFailureDelay);
|
||||
this(restClient, hostsSniffer, new DefaultScheduler(), sniffInterval, sniffAfterFailureDelay);
|
||||
}
|
||||
|
||||
Sniffer(RestClient restClient, HostsSniffer hostsSniffer, Scheduler scheduler, long sniffInterval, long sniffAfterFailureDelay) {
|
||||
this.hostsSniffer = hostsSniffer;
|
||||
this.restClient = restClient;
|
||||
this.sniffIntervalMillis = sniffInterval;
|
||||
this.sniffAfterFailureDelayMillis = sniffAfterFailureDelay;
|
||||
this.scheduler = scheduler;
|
||||
/*
|
||||
* The first sniffing round is async, so this constructor returns before nextScheduledTask is assigned to a task.
|
||||
* The initialized flag is a protection against NPE due to that.
|
||||
*/
|
||||
Task task = new Task(sniffIntervalMillis) {
|
||||
@Override
|
||||
public void run() {
|
||||
super.run();
|
||||
initialized.compareAndSet(false, true);
|
||||
}
|
||||
};
|
||||
/*
|
||||
* We do not keep track of the returned future as we never intend to cancel the initial sniffing round, we rather
|
||||
* prevent any other operation from being executed till the sniffer is properly initialized
|
||||
*/
|
||||
scheduler.schedule(task, 0L);
|
||||
}
|
||||
|
||||
/**
|
||||
* Triggers a new sniffing round and explicitly takes out the failed host provided as argument
|
||||
* Schedule sniffing to run as soon as possible if it isn't already running. Once such sniffing round runs
|
||||
* it will also schedule a new round after sniffAfterFailureDelay ms.
|
||||
*/
|
||||
public void sniffOnFailure(HttpHost failedHost) {
|
||||
this.task.sniffOnFailure(failedHost);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
task.shutdown();
|
||||
}
|
||||
|
||||
private static class Task implements Runnable {
|
||||
private final HostsSniffer hostsSniffer;
|
||||
private final RestClient restClient;
|
||||
|
||||
private final long sniffIntervalMillis;
|
||||
private final long sniffAfterFailureDelayMillis;
|
||||
private final ScheduledExecutorService scheduledExecutorService;
|
||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
|
||||
private Task(HostsSniffer hostsSniffer, RestClient restClient, long sniffIntervalMillis, long sniffAfterFailureDelayMillis) {
|
||||
this.hostsSniffer = hostsSniffer;
|
||||
this.restClient = restClient;
|
||||
this.sniffIntervalMillis = sniffIntervalMillis;
|
||||
this.sniffAfterFailureDelayMillis = sniffAfterFailureDelayMillis;
|
||||
SnifferThreadFactory threadFactory = new SnifferThreadFactory(SNIFFER_THREAD_NAME);
|
||||
this.scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactory);
|
||||
scheduleNextRun(0);
|
||||
}
|
||||
|
||||
synchronized void scheduleNextRun(long delayMillis) {
|
||||
if (scheduledExecutorService.isShutdown() == false) {
|
||||
try {
|
||||
if (scheduledFuture != null) {
|
||||
//regardless of when the next sniff is scheduled, cancel it and schedule a new one with updated delay
|
||||
this.scheduledFuture.cancel(false);
|
||||
}
|
||||
logger.debug("scheduling next sniff in " + delayMillis + " ms");
|
||||
this.scheduledFuture = this.scheduledExecutorService.schedule(this, delayMillis, TimeUnit.MILLISECONDS);
|
||||
} catch(Exception e) {
|
||||
logger.error("error while scheduling next sniffer task", e);
|
||||
}
|
||||
public void sniffOnFailure() {
|
||||
//sniffOnFailure does nothing until the initial sniffing round has been completed
|
||||
if (initialized.get()) {
|
||||
/*
|
||||
* If sniffing is already running, there is no point in scheduling another round right after the current one.
|
||||
* Concurrent calls may be checking the same task state, but only the first skip call on the same task returns true.
|
||||
* The task may also get replaced while we check its state, in which case calling skip on it returns false.
|
||||
*/
|
||||
if (this.nextScheduledTask.skip()) {
|
||||
/*
|
||||
* We do not keep track of this future as the task will immediately run and we don't intend to cancel it
|
||||
* due to concurrent sniffOnFailure runs. Effectively the previous (now cancelled or skipped) task will stay
|
||||
* assigned to nextTask till this onFailure round gets run and schedules its corresponding afterFailure round.
|
||||
*/
|
||||
scheduler.schedule(new Task(sniffAfterFailureDelayMillis), 0L);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
enum TaskState {
|
||||
WAITING, SKIPPED, STARTED
|
||||
}
|
||||
|
||||
class Task implements Runnable {
|
||||
final long nextTaskDelay;
|
||||
final AtomicReference<TaskState> taskState = new AtomicReference<>(TaskState.WAITING);
|
||||
|
||||
Task(long nextTaskDelay) {
|
||||
this.nextTaskDelay = nextTaskDelay;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
sniff(null, sniffIntervalMillis);
|
||||
}
|
||||
|
||||
void sniffOnFailure(HttpHost failedHost) {
|
||||
sniff(failedHost, sniffAfterFailureDelayMillis);
|
||||
}
|
||||
|
||||
void sniff(HttpHost excludeHost, long nextSniffDelayMillis) {
|
||||
if (running.compareAndSet(false, true)) {
|
||||
try {
|
||||
List<HttpHost> sniffedHosts = hostsSniffer.sniffHosts();
|
||||
logger.debug("sniffed hosts: " + sniffedHosts);
|
||||
if (excludeHost != null) {
|
||||
sniffedHosts.remove(excludeHost);
|
||||
}
|
||||
if (sniffedHosts.isEmpty()) {
|
||||
logger.warn("no hosts to set, hosts will be updated at the next sniffing round");
|
||||
} else {
|
||||
this.restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()]));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
logger.error("error while sniffing nodes", e);
|
||||
} finally {
|
||||
scheduleNextRun(nextSniffDelayMillis);
|
||||
running.set(false);
|
||||
}
|
||||
/*
|
||||
* Skipped or already started tasks do nothing. In most cases tasks will be cancelled and not run, but we want to protect for
|
||||
* cases where future#cancel returns true yet the task runs. We want to make sure that such tasks do nothing otherwise they will
|
||||
* schedule another round at the end and so on, leaving us with multiple parallel sniffing "tracks" whish is undesirable.
|
||||
*/
|
||||
if (taskState.compareAndSet(TaskState.WAITING, TaskState.STARTED) == false) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
synchronized void shutdown() {
|
||||
scheduledExecutorService.shutdown();
|
||||
try {
|
||||
if (scheduledExecutorService.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
|
||||
return;
|
||||
}
|
||||
scheduledExecutorService.shutdownNow();
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
sniff();
|
||||
} catch (Exception e) {
|
||||
logger.error("error while sniffing nodes", e);
|
||||
} finally {
|
||||
Task task = new Task(sniffIntervalMillis);
|
||||
Future<?> future = scheduler.schedule(task, nextTaskDelay);
|
||||
//tasks are run by a single threaded executor, so swapping is safe with a simple volatile variable
|
||||
ScheduledTask previousTask = nextScheduledTask;
|
||||
nextScheduledTask = new ScheduledTask(task, future);
|
||||
assert initialized.get() == false ||
|
||||
previousTask.task.isSkipped() || previousTask.task.hasStarted() : "task that we are replacing is neither " +
|
||||
"cancelled nor has it ever started";
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the task has started, false in case it didn't start (yet?) or it was skipped
|
||||
*/
|
||||
boolean hasStarted() {
|
||||
return taskState.get() == TaskState.STARTED;
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets this task to be skipped. Returns true if the task will be skipped, false if the task has already started.
|
||||
*/
|
||||
boolean skip() {
|
||||
/*
|
||||
* Threads may still get run although future#cancel returns true. We make sure that a task is either cancelled (or skipped),
|
||||
* or entirely run. In the odd case that future#cancel returns true and the thread still runs, the task won't do anything.
|
||||
* In case future#cancel returns true but the task has already started, this state change will not succeed hence this method
|
||||
* returns false and the task will normally run.
|
||||
*/
|
||||
return taskState.compareAndSet(TaskState.WAITING, TaskState.SKIPPED);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns true if the task was set to be skipped before it was started
|
||||
*/
|
||||
boolean isSkipped() {
|
||||
return taskState.get() == TaskState.SKIPPED;
|
||||
}
|
||||
}
|
||||
|
||||
static final class ScheduledTask {
|
||||
final Task task;
|
||||
final Future<?> future;
|
||||
|
||||
ScheduledTask(Task task, Future<?> future) {
|
||||
this.task = task;
|
||||
this.future = future;
|
||||
}
|
||||
|
||||
/**
|
||||
* Cancels this task. Returns true if the task has been successfully cancelled, meaning it won't be executed
|
||||
* or if it is its execution won't have any effect. Returns false if the task cannot be cancelled (possibly it was
|
||||
* already cancelled or already completed).
|
||||
*/
|
||||
boolean skip() {
|
||||
/*
|
||||
* Future#cancel should return false whenever a task cannot be cancelled, most likely as it has already started. We don't
|
||||
* trust it much though so we try to cancel hoping that it will work. At the same time we always call skip too, which means
|
||||
* that if the task has already started the state change will fail. We could potentially not call skip when cancel returns
|
||||
* false but we prefer to stay on the safe side.
|
||||
*/
|
||||
future.cancel(false);
|
||||
return task.skip();
|
||||
}
|
||||
}
|
||||
|
||||
final void sniff() throws IOException {
|
||||
List<HttpHost> sniffedHosts = hostsSniffer.sniffHosts();
|
||||
if (logger.isDebugEnabled()) {
|
||||
logger.debug("sniffed hosts: " + sniffedHosts);
|
||||
}
|
||||
if (sniffedHosts.isEmpty()) {
|
||||
logger.warn("no hosts to set, hosts will be updated at the next sniffing round");
|
||||
} else {
|
||||
restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()]));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
if (initialized.get()) {
|
||||
nextScheduledTask.skip();
|
||||
}
|
||||
this.scheduler.shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -158,8 +234,62 @@ public class Sniffer implements Closeable {
|
|||
return new SnifferBuilder(restClient);
|
||||
}
|
||||
|
||||
private static class SnifferThreadFactory implements ThreadFactory {
|
||||
/**
|
||||
* The Scheduler interface allows to isolate the sniffing scheduling aspects so that we can test
|
||||
* the sniffer by injecting when needed a custom scheduler that is more suited for testing.
|
||||
*/
|
||||
interface Scheduler {
|
||||
/**
|
||||
* Schedules the provided {@link Runnable} to be executed in <code>delayMillis</code> milliseconds
|
||||
*/
|
||||
Future<?> schedule(Task task, long delayMillis);
|
||||
|
||||
/**
|
||||
* Shuts this scheduler down
|
||||
*/
|
||||
void shutdown();
|
||||
}
|
||||
|
||||
/**
|
||||
* Default implementation of {@link Scheduler}, based on {@link ScheduledExecutorService}
|
||||
*/
|
||||
static final class DefaultScheduler implements Scheduler {
|
||||
final ScheduledExecutorService executor;
|
||||
|
||||
DefaultScheduler() {
|
||||
this(initScheduledExecutorService());
|
||||
}
|
||||
|
||||
DefaultScheduler(ScheduledExecutorService executor) {
|
||||
this.executor = executor;
|
||||
}
|
||||
|
||||
private static ScheduledExecutorService initScheduledExecutorService() {
|
||||
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new SnifferThreadFactory(SNIFFER_THREAD_NAME));
|
||||
executor.setRemoveOnCancelPolicy(true);
|
||||
return executor;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Future<?> schedule(Task task, long delayMillis) {
|
||||
return executor.schedule(task, delayMillis, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
executor.shutdown();
|
||||
try {
|
||||
if (executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
|
||||
return;
|
||||
}
|
||||
executor.shutdownNow();
|
||||
} catch (InterruptedException ignore) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
static class SnifferThreadFactory implements ThreadFactory {
|
||||
private final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||
private final String namePrefix;
|
||||
private final ThreadFactory originalThreadFactory;
|
||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.client.sniff;
|
|||
|
||||
import org.apache.http.HttpHost;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
|
@ -30,7 +29,7 @@ import java.util.List;
|
|||
*/
|
||||
class MockHostsSniffer implements HostsSniffer {
|
||||
@Override
|
||||
public List<HttpHost> sniffHosts() throws IOException {
|
||||
public List<HttpHost> sniffHosts() {
|
||||
return Collections.singletonList(new HttpHost("localhost", 9200));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,656 @@
|
|||
/*
|
||||
* Licensed to Elasticsearch under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
package org.elasticsearch.client.sniff;
|
||||
|
||||
import org.apache.http.HttpHost;
|
||||
import org.elasticsearch.client.RestClient;
|
||||
import org.elasticsearch.client.RestClientTestCase;
|
||||
import org.elasticsearch.client.sniff.Sniffer.DefaultScheduler;
|
||||
import org.elasticsearch.client.sniff.Sniffer.Scheduler;
|
||||
import org.mockito.Matchers;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.CancellationException;
|
||||
import java.util.concurrent.CopyOnWriteArraySet;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
import static org.hamcrest.CoreMatchers.equalTo;
|
||||
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.assertSame;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Matchers.any;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.times;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
public class SnifferTests extends RestClientTestCase {
|
||||
|
||||
/**
|
||||
* Tests the {@link Sniffer#sniff()} method in isolation. Verifies that it uses the {@link HostsSniffer} implementation
|
||||
* to retrieve nodes and set them (when not empty) to the provided {@link RestClient} instance.
|
||||
*/
|
||||
public void testSniff() throws IOException {
|
||||
HttpHost initialHost = new HttpHost("localhost", 9200);
|
||||
try (RestClient restClient = RestClient.builder(initialHost).build()) {
|
||||
Scheduler noOpScheduler = new Scheduler() {
|
||||
@Override
|
||||
public Future<?> schedule(Sniffer.Task task, long delayMillis) {
|
||||
return mock(Future.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
|
||||
}
|
||||
};
|
||||
CountingHostsSniffer hostsSniffer = new CountingHostsSniffer();
|
||||
int iters = randomIntBetween(5, 30);
|
||||
try (Sniffer sniffer = new Sniffer(restClient, hostsSniffer, noOpScheduler, 1000L, -1)){
|
||||
{
|
||||
assertEquals(1, restClient.getHosts().size());
|
||||
HttpHost httpHost = restClient.getHosts().get(0);
|
||||
assertEquals("localhost", httpHost.getHostName());
|
||||
assertEquals(9200, httpHost.getPort());
|
||||
}
|
||||
int emptyList = 0;
|
||||
int failures = 0;
|
||||
int runs = 0;
|
||||
List<HttpHost> lastHosts = Collections.singletonList(initialHost);
|
||||
for (int i = 0; i < iters; i++) {
|
||||
try {
|
||||
runs++;
|
||||
sniffer.sniff();
|
||||
if (hostsSniffer.failures.get() > failures) {
|
||||
failures++;
|
||||
fail("should have failed given that hostsSniffer says it threw an exception");
|
||||
} else if (hostsSniffer.emptyList.get() > emptyList) {
|
||||
emptyList++;
|
||||
assertEquals(lastHosts, restClient.getHosts());
|
||||
} else {
|
||||
assertNotEquals(lastHosts, restClient.getHosts());
|
||||
List<HttpHost> expectedHosts = CountingHostsSniffer.buildHosts(runs);
|
||||
assertEquals(expectedHosts, restClient.getHosts());
|
||||
lastHosts = restClient.getHosts();
|
||||
}
|
||||
} catch(IOException e) {
|
||||
if (hostsSniffer.failures.get() > failures) {
|
||||
failures++;
|
||||
assertEquals("communication breakdown", e.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
assertEquals(hostsSniffer.emptyList.get(), emptyList);
|
||||
assertEquals(hostsSniffer.failures.get(), failures);
|
||||
assertEquals(hostsSniffer.runs.get(), runs);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test multiple sniffing rounds by mocking the {@link Scheduler} as well as the {@link HostsSniffer}.
|
||||
* Simulates the ordinary behaviour of {@link Sniffer} when sniffing on failure is not enabled.
|
||||
* The {@link CountingHostsSniffer} doesn't make any network connection but may throw exception or return no hosts, which makes
|
||||
* it possible to verify that errors are properly handled and don't affect subsequent runs and their scheduling.
|
||||
* The {@link Scheduler} implementation submits rather than scheduling tasks, meaning that it doesn't respect the requested sniff
|
||||
* delays while allowing to assert that the requested delays for each requested run and the following one are the expected values.
|
||||
*/
|
||||
public void testOrdinarySniffRounds() throws Exception {
|
||||
final long sniffInterval = randomLongBetween(1, Long.MAX_VALUE);
|
||||
long sniffAfterFailureDelay = randomLongBetween(1, Long.MAX_VALUE);
|
||||
RestClient restClient = mock(RestClient.class);
|
||||
CountingHostsSniffer hostsSniffer = new CountingHostsSniffer();
|
||||
final int iters = randomIntBetween(30, 100);
|
||||
final Set<Future<?>> futures = new CopyOnWriteArraySet<>();
|
||||
final CountDownLatch completionLatch = new CountDownLatch(1);
|
||||
final AtomicInteger runs = new AtomicInteger(iters);
|
||||
final ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||
final AtomicReference<Future<?>> lastFuture = new AtomicReference<>();
|
||||
final AtomicReference<Sniffer.Task> lastTask = new AtomicReference<>();
|
||||
Scheduler scheduler = new Scheduler() {
|
||||
@Override
|
||||
public Future<?> schedule(Sniffer.Task task, long delayMillis) {
|
||||
assertEquals(sniffInterval, task.nextTaskDelay);
|
||||
int numberOfRuns = runs.getAndDecrement();
|
||||
if (numberOfRuns == iters) {
|
||||
//the first call is to schedule the first sniff round from the Sniffer constructor, with delay O
|
||||
assertEquals(0L, delayMillis);
|
||||
assertEquals(sniffInterval, task.nextTaskDelay);
|
||||
} else {
|
||||
//all of the subsequent times "schedule" is called with delay set to the configured sniff interval
|
||||
assertEquals(sniffInterval, delayMillis);
|
||||
assertEquals(sniffInterval, task.nextTaskDelay);
|
||||
if (numberOfRuns == 0) {
|
||||
completionLatch.countDown();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
//we submit rather than scheduling to make the test quick and not depend on time
|
||||
Future<?> future = executor.submit(task);
|
||||
futures.add(future);
|
||||
if (numberOfRuns == 1) {
|
||||
lastFuture.set(future);
|
||||
lastTask.set(task);
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
//the executor is closed externally, shutdown is tested separately
|
||||
}
|
||||
};
|
||||
try {
|
||||
new Sniffer(restClient, hostsSniffer, scheduler, sniffInterval, sniffAfterFailureDelay);
|
||||
assertTrue("timeout waiting for sniffing rounds to be completed", completionLatch.await(1000, TimeUnit.MILLISECONDS));
|
||||
assertEquals(iters, futures.size());
|
||||
//the last future is the only one that may not be completed yet, as the count down happens
|
||||
//while scheduling the next round which is still part of the execution of the runnable itself.
|
||||
assertTrue(lastTask.get().hasStarted());
|
||||
lastFuture.get().get();
|
||||
for (Future<?> future : futures) {
|
||||
assertTrue(future.isDone());
|
||||
future.get();
|
||||
}
|
||||
} finally {
|
||||
executor.shutdown();
|
||||
assertTrue(executor.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
int totalRuns = hostsSniffer.runs.get();
|
||||
assertEquals(iters, totalRuns);
|
||||
int setHostsRuns = totalRuns - hostsSniffer.failures.get() - hostsSniffer.emptyList.get();
|
||||
verify(restClient, times(setHostsRuns)).setHosts(Matchers.<HttpHost>anyVararg());
|
||||
verifyNoMoreInteractions(restClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that {@link Sniffer#close()} shuts down the underlying {@link Scheduler}, and that such calls are idempotent.
|
||||
* Also verifies that the next scheduled round gets cancelled.
|
||||
*/
|
||||
public void testClose() {
|
||||
final Future<?> future = mock(Future.class);
|
||||
long sniffInterval = randomLongBetween(1, Long.MAX_VALUE);
|
||||
long sniffAfterFailureDelay = randomLongBetween(1, Long.MAX_VALUE);
|
||||
RestClient restClient = mock(RestClient.class);
|
||||
final AtomicInteger shutdown = new AtomicInteger(0);
|
||||
final AtomicBoolean initialized = new AtomicBoolean(false);
|
||||
Scheduler scheduler = new Scheduler() {
|
||||
@Override
|
||||
public Future<?> schedule(Sniffer.Task task, long delayMillis) {
|
||||
if (initialized.compareAndSet(false, true)) {
|
||||
//run from the same thread so the sniffer gets for sure initialized and the scheduled task gets cancelled on close
|
||||
task.run();
|
||||
}
|
||||
return future;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
shutdown.incrementAndGet();
|
||||
}
|
||||
};
|
||||
|
||||
Sniffer sniffer = new Sniffer(restClient, new MockHostsSniffer(), scheduler, sniffInterval, sniffAfterFailureDelay);
|
||||
assertEquals(0, shutdown.get());
|
||||
int iters = randomIntBetween(3, 10);
|
||||
for (int i = 1; i <= iters; i++) {
|
||||
sniffer.close();
|
||||
verify(future, times(i)).cancel(false);
|
||||
assertEquals(i, shutdown.get());
|
||||
}
|
||||
}
|
||||
|
||||
public void testSniffOnFailureNotInitialized() {
|
||||
RestClient restClient = mock(RestClient.class);
|
||||
CountingHostsSniffer hostsSniffer = new CountingHostsSniffer();
|
||||
long sniffInterval = randomLongBetween(1, Long.MAX_VALUE);
|
||||
long sniffAfterFailureDelay = randomLongBetween(1, Long.MAX_VALUE);
|
||||
final AtomicInteger scheduleCalls = new AtomicInteger(0);
|
||||
Scheduler scheduler = new Scheduler() {
|
||||
@Override
|
||||
public Future<?> schedule(Sniffer.Task task, long delayMillis) {
|
||||
scheduleCalls.incrementAndGet();
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
}
|
||||
};
|
||||
|
||||
Sniffer sniffer = new Sniffer(restClient, hostsSniffer, scheduler, sniffInterval, sniffAfterFailureDelay);
|
||||
for (int i = 0; i < 10; i++) {
|
||||
sniffer.sniffOnFailure();
|
||||
}
|
||||
assertEquals(1, scheduleCalls.get());
|
||||
int totalRuns = hostsSniffer.runs.get();
|
||||
assertEquals(0, totalRuns);
|
||||
int setHostsRuns = totalRuns - hostsSniffer.failures.get() - hostsSniffer.emptyList.get();
|
||||
verify(restClient, times(setHostsRuns)).setHosts(Matchers.<HttpHost>anyVararg());
|
||||
verifyNoMoreInteractions(restClient);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test behaviour when a bunch of onFailure sniffing rounds are triggered in parallel. Each run will always
|
||||
* schedule a subsequent afterFailure round. Also, for each onFailure round that starts, the net scheduled round
|
||||
* (either afterFailure or ordinary) gets cancelled.
|
||||
*/
|
||||
public void testSniffOnFailure() throws Exception {
|
||||
RestClient restClient = mock(RestClient.class);
|
||||
CountingHostsSniffer hostsSniffer = new CountingHostsSniffer();
|
||||
final AtomicBoolean initializing = new AtomicBoolean(true);
|
||||
final long sniffInterval = randomLongBetween(1, Long.MAX_VALUE);
|
||||
final long sniffAfterFailureDelay = randomLongBetween(1, Long.MAX_VALUE);
|
||||
int minNumOnFailureRounds = randomIntBetween(5, 10);
|
||||
final CountDownLatch initializingLatch = new CountDownLatch(1);
|
||||
final Set<Sniffer.ScheduledTask> ordinaryRoundsTasks = new CopyOnWriteArraySet<>();
|
||||
final AtomicReference<Future<?>> initializingFuture = new AtomicReference<>();
|
||||
final Set<Sniffer.ScheduledTask> onFailureTasks = new CopyOnWriteArraySet<>();
|
||||
final Set<Sniffer.ScheduledTask> afterFailureTasks = new CopyOnWriteArraySet<>();
|
||||
final AtomicBoolean onFailureCompleted = new AtomicBoolean(false);
|
||||
final CountDownLatch completionLatch = new CountDownLatch(1);
|
||||
final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
|
||||
try {
|
||||
Scheduler scheduler = new Scheduler() {
|
||||
@Override
|
||||
public Future<?> schedule(final Sniffer.Task task, long delayMillis) {
|
||||
if (initializing.compareAndSet(true, false)) {
|
||||
assertEquals(0L, delayMillis);
|
||||
Future<?> future = executor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
task.run();
|
||||
} finally {
|
||||
//we need to make sure that the sniffer is initialized, so the sniffOnFailure
|
||||
//call does what it needs to do. Otherwise nothing happens until initialized.
|
||||
initializingLatch.countDown();
|
||||
}
|
||||
}
|
||||
});
|
||||
assertTrue(initializingFuture.compareAndSet(null, future));
|
||||
return future;
|
||||
}
|
||||
if (delayMillis == 0L) {
|
||||
Future<?> future = executor.submit(task);
|
||||
onFailureTasks.add(new Sniffer.ScheduledTask(task, future));
|
||||
return future;
|
||||
}
|
||||
if (delayMillis == sniffAfterFailureDelay) {
|
||||
Future<?> future = scheduleOrSubmit(task);
|
||||
afterFailureTasks.add(new Sniffer.ScheduledTask(task, future));
|
||||
return future;
|
||||
}
|
||||
|
||||
assertEquals(sniffInterval, delayMillis);
|
||||
assertEquals(sniffInterval, task.nextTaskDelay);
|
||||
|
||||
if (onFailureCompleted.get() && onFailureTasks.size() == afterFailureTasks.size()) {
|
||||
completionLatch.countDown();
|
||||
return mock(Future.class);
|
||||
}
|
||||
|
||||
Future<?> future = scheduleOrSubmit(task);
|
||||
ordinaryRoundsTasks.add(new Sniffer.ScheduledTask(task, future));
|
||||
return future;
|
||||
}
|
||||
|
||||
private Future<?> scheduleOrSubmit(Sniffer.Task task) {
|
||||
if (randomBoolean()) {
|
||||
return executor.schedule(task, randomLongBetween(0L, 200L), TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
return executor.submit(task);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
}
|
||||
};
|
||||
final Sniffer sniffer = new Sniffer(restClient, hostsSniffer, scheduler, sniffInterval, sniffAfterFailureDelay);
|
||||
assertTrue("timeout waiting for sniffer to get initialized", initializingLatch.await(1000, TimeUnit.MILLISECONDS));
|
||||
|
||||
ExecutorService onFailureExecutor = Executors.newFixedThreadPool(randomIntBetween(5, 20));
|
||||
Set<Future<?>> onFailureFutures = new CopyOnWriteArraySet<>();
|
||||
try {
|
||||
//with tasks executing quickly one after each other, it is very likely that the onFailure round gets skipped
|
||||
//as another round is already running. We retry till enough runs get through as that's what we want to test.
|
||||
while (onFailureTasks.size() < minNumOnFailureRounds) {
|
||||
onFailureFutures.add(onFailureExecutor.submit(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
sniffer.sniffOnFailure();
|
||||
}
|
||||
}));
|
||||
}
|
||||
assertThat(onFailureFutures.size(), greaterThanOrEqualTo(minNumOnFailureRounds));
|
||||
for (Future<?> onFailureFuture : onFailureFutures) {
|
||||
assertNull(onFailureFuture.get());
|
||||
}
|
||||
onFailureCompleted.set(true);
|
||||
} finally {
|
||||
onFailureExecutor.shutdown();
|
||||
onFailureExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
assertFalse(initializingFuture.get().isCancelled());
|
||||
assertTrue(initializingFuture.get().isDone());
|
||||
assertNull(initializingFuture.get().get());
|
||||
|
||||
assertTrue("timeout waiting for sniffing rounds to be completed", completionLatch.await(1000, TimeUnit.MILLISECONDS));
|
||||
assertThat(onFailureTasks.size(), greaterThanOrEqualTo(minNumOnFailureRounds));
|
||||
assertEquals(onFailureTasks.size(), afterFailureTasks.size());
|
||||
|
||||
for (Sniffer.ScheduledTask onFailureTask : onFailureTasks) {
|
||||
assertFalse(onFailureTask.future.isCancelled());
|
||||
assertTrue(onFailureTask.future.isDone());
|
||||
assertNull(onFailureTask.future.get());
|
||||
assertTrue(onFailureTask.task.hasStarted());
|
||||
assertFalse(onFailureTask.task.isSkipped());
|
||||
}
|
||||
|
||||
int cancelledTasks = 0;
|
||||
int completedTasks = onFailureTasks.size() + 1;
|
||||
for (Sniffer.ScheduledTask afterFailureTask : afterFailureTasks) {
|
||||
if (assertTaskCancelledOrCompleted(afterFailureTask)) {
|
||||
completedTasks++;
|
||||
} else {
|
||||
cancelledTasks++;
|
||||
}
|
||||
}
|
||||
|
||||
assertThat(ordinaryRoundsTasks.size(), greaterThan(0));
|
||||
for (Sniffer.ScheduledTask task : ordinaryRoundsTasks) {
|
||||
if (assertTaskCancelledOrCompleted(task)) {
|
||||
completedTasks++;
|
||||
} else {
|
||||
cancelledTasks++;
|
||||
}
|
||||
}
|
||||
assertEquals(onFailureTasks.size(), cancelledTasks);
|
||||
|
||||
assertEquals(completedTasks, hostsSniffer.runs.get());
|
||||
int setHostsRuns = hostsSniffer.runs.get() - hostsSniffer.failures.get() - hostsSniffer.emptyList.get();
|
||||
verify(restClient, times(setHostsRuns)).setHosts(Matchers.<HttpHost>anyVararg());
|
||||
verifyNoMoreInteractions(restClient);
|
||||
} finally {
|
||||
executor.shutdown();
|
||||
executor.awaitTermination(1000L, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
private static boolean assertTaskCancelledOrCompleted(Sniffer.ScheduledTask task) throws ExecutionException, InterruptedException {
|
||||
if (task.task.isSkipped()) {
|
||||
assertTrue(task.future.isCancelled());
|
||||
try {
|
||||
task.future.get();
|
||||
fail("cancellation exception should have been thrown");
|
||||
} catch(CancellationException ignore) {
|
||||
}
|
||||
return false;
|
||||
} else {
|
||||
try {
|
||||
assertNull(task.future.get());
|
||||
} catch(CancellationException ignore) {
|
||||
assertTrue(task.future.isCancelled());
|
||||
}
|
||||
assertTrue(task.future.isDone());
|
||||
assertTrue(task.task.hasStarted());
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
public void testTaskCancelling() throws Exception {
|
||||
RestClient restClient = mock(RestClient.class);
|
||||
HostsSniffer hostsSniffer = mock(HostsSniffer.class);
|
||||
Scheduler noOpScheduler = new Scheduler() {
|
||||
@Override
|
||||
public Future<?> schedule(Sniffer.Task task, long delayMillis) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
}
|
||||
};
|
||||
Sniffer sniffer = new Sniffer(restClient, hostsSniffer, noOpScheduler, 0L, 0L);
|
||||
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
|
||||
try {
|
||||
int numIters = randomIntBetween(50, 100);
|
||||
for (int i = 0; i < numIters; i++) {
|
||||
Sniffer.Task task = sniffer.new Task(0L);
|
||||
TaskWrapper wrapper = new TaskWrapper(task);
|
||||
Future<?> future;
|
||||
if (rarely()) {
|
||||
future = executor.schedule(wrapper, randomLongBetween(0L, 200L), TimeUnit.MILLISECONDS);
|
||||
} else {
|
||||
future = executor.submit(wrapper);
|
||||
}
|
||||
Sniffer.ScheduledTask scheduledTask = new Sniffer.ScheduledTask(task, future);
|
||||
boolean skip = scheduledTask.skip();
|
||||
try {
|
||||
assertNull(future.get());
|
||||
} catch(CancellationException ignore) {
|
||||
assertTrue(future.isCancelled());
|
||||
}
|
||||
|
||||
if (skip) {
|
||||
//the task was either cancelled before starting, in which case it will never start (thanks to Future#cancel),
|
||||
//or skipped, in which case it will run but do nothing (thanks to Task#skip).
|
||||
//Here we want to make sure that whenever skip returns true, the task either won't run or it won't do anything,
|
||||
//otherwise we may end up with parallel sniffing tracks given that each task schedules the following one. We need to
|
||||
// make sure that onFailure takes scheduling over while at the same time ordinary rounds don't go on.
|
||||
assertFalse(task.hasStarted());
|
||||
assertTrue(task.isSkipped());
|
||||
assertTrue(future.isCancelled());
|
||||
assertTrue(future.isDone());
|
||||
} else {
|
||||
//if a future is cancelled when its execution has already started, future#get throws CancellationException before
|
||||
//completion. The execution continues though so we use a latch to try and wait for the task to be completed.
|
||||
//Here we want to make sure that whenever skip returns false, the task will be completed, otherwise we may be
|
||||
//missing to schedule the following round, which means no sniffing will ever happen again besides on failure sniffing.
|
||||
assertTrue(wrapper.await());
|
||||
//the future may or may not be cancelled but the task has for sure started and completed
|
||||
assertTrue(task.toString(), task.hasStarted());
|
||||
assertFalse(task.isSkipped());
|
||||
assertTrue(future.isDone());
|
||||
}
|
||||
//subsequent cancel calls return false for sure
|
||||
int cancelCalls = randomIntBetween(1, 10);
|
||||
for (int j = 0; j < cancelCalls; j++) {
|
||||
assertFalse(scheduledTask.skip());
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
executor.shutdown();
|
||||
executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wraps a {@link Sniffer.Task} and allows to wait for its completion. This is needed to verify
|
||||
* that tasks are either never started or always completed. Calling {@link Future#get()} against a cancelled future will
|
||||
* throw {@link CancellationException} straight-away but the execution of the task will continue if it had already started,
|
||||
* in which case {@link Future#cancel(boolean)} returns true which is not very helpful.
|
||||
*/
|
||||
private static final class TaskWrapper implements Runnable {
|
||||
final Sniffer.Task task;
|
||||
final CountDownLatch completionLatch = new CountDownLatch(1);
|
||||
|
||||
TaskWrapper(Sniffer.Task task) {
|
||||
this.task = task;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
task.run();
|
||||
} finally {
|
||||
completionLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
boolean await() throws InterruptedException {
|
||||
return completionLatch.await(1000, TimeUnit.MILLISECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Mock {@link HostsSniffer} implementation used for testing, which most of the times return a fixed host.
|
||||
* It rarely throws exception or return an empty list of hosts, to make sure that such situations are properly handled.
|
||||
* It also asserts that it never gets called concurrently, based on the assumption that only one sniff run can be run
|
||||
* at a given point in time.
|
||||
*/
|
||||
private static class CountingHostsSniffer implements HostsSniffer {
|
||||
private final AtomicInteger runs = new AtomicInteger(0);
|
||||
private final AtomicInteger failures = new AtomicInteger(0);
|
||||
private final AtomicInteger emptyList = new AtomicInteger(0);
|
||||
|
||||
@Override
|
||||
public List<HttpHost> sniffHosts() throws IOException {
|
||||
int run = runs.incrementAndGet();
|
||||
if (rarely()) {
|
||||
failures.incrementAndGet();
|
||||
//check that if communication breaks, sniffer keeps on working
|
||||
throw new IOException("communication breakdown");
|
||||
}
|
||||
if (rarely()) {
|
||||
emptyList.incrementAndGet();
|
||||
return Collections.emptyList();
|
||||
}
|
||||
return buildHosts(run);
|
||||
}
|
||||
|
||||
private static List<HttpHost> buildHosts(int run) {
|
||||
int size = run % 5 + 1;
|
||||
assert size > 0;
|
||||
List<HttpHost> hosts = new ArrayList<>(size);
|
||||
for (int i = 0; i < size; i++) {
|
||||
hosts.add(new HttpHost("sniffed-" + run, 9200 + i));
|
||||
}
|
||||
return hosts;
|
||||
}
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
public void testDefaultSchedulerSchedule() {
|
||||
RestClient restClient = mock(RestClient.class);
|
||||
HostsSniffer hostsSniffer = mock(HostsSniffer.class);
|
||||
Scheduler noOpScheduler = new Scheduler() {
|
||||
@Override
|
||||
public Future<?> schedule(Sniffer.Task task, long delayMillis) {
|
||||
return mock(Future.class);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void shutdown() {
|
||||
|
||||
}
|
||||
};
|
||||
Sniffer sniffer = new Sniffer(restClient, hostsSniffer, noOpScheduler, 0L, 0L);
|
||||
Sniffer.Task task = sniffer.new Task(randomLongBetween(1, Long.MAX_VALUE));
|
||||
|
||||
ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
|
||||
final ScheduledFuture<?> mockedFuture = mock(ScheduledFuture.class);
|
||||
when(scheduledExecutorService.schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class)))
|
||||
.then(new Answer<ScheduledFuture<?>>() {
|
||||
@Override
|
||||
public ScheduledFuture<?> answer(InvocationOnMock invocationOnMock) {
|
||||
return mockedFuture;
|
||||
}
|
||||
});
|
||||
DefaultScheduler scheduler = new DefaultScheduler(scheduledExecutorService);
|
||||
long delay = randomLongBetween(1, Long.MAX_VALUE);
|
||||
Future<?> future = scheduler.schedule(task, delay);
|
||||
assertSame(mockedFuture, future);
|
||||
verify(scheduledExecutorService).schedule(task, delay, TimeUnit.MILLISECONDS);
|
||||
verifyNoMoreInteractions(scheduledExecutorService, mockedFuture);
|
||||
}
|
||||
|
||||
public void testDefaultSchedulerThreadFactory() {
|
||||
DefaultScheduler defaultScheduler = new DefaultScheduler();
|
||||
try {
|
||||
ScheduledExecutorService executorService = defaultScheduler.executor;
|
||||
assertThat(executorService, instanceOf(ScheduledThreadPoolExecutor.class));
|
||||
assertThat(executorService, instanceOf(ScheduledThreadPoolExecutor.class));
|
||||
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) executorService;
|
||||
assertTrue(executor.getRemoveOnCancelPolicy());
|
||||
assertFalse(executor.getContinueExistingPeriodicTasksAfterShutdownPolicy());
|
||||
assertTrue(executor.getExecuteExistingDelayedTasksAfterShutdownPolicy());
|
||||
assertThat(executor.getThreadFactory(), instanceOf(Sniffer.SnifferThreadFactory.class));
|
||||
int iters = randomIntBetween(3, 10);
|
||||
for (int i = 1; i <= iters; i++) {
|
||||
Thread thread = executor.getThreadFactory().newThread(new Runnable() {
|
||||
@Override
|
||||
public void run() {
|
||||
|
||||
}
|
||||
});
|
||||
assertThat(thread.getName(), equalTo("es_rest_client_sniffer[T#" + i + "]"));
|
||||
assertThat(thread.isDaemon(), is(true));
|
||||
}
|
||||
} finally {
|
||||
defaultScheduler.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public void testDefaultSchedulerShutdown() throws Exception {
|
||||
ScheduledThreadPoolExecutor executor = mock(ScheduledThreadPoolExecutor.class);
|
||||
DefaultScheduler defaultScheduler = new DefaultScheduler(executor);
|
||||
defaultScheduler.shutdown();
|
||||
verify(executor).shutdown();
|
||||
verify(executor).awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||
verify(executor).shutdownNow();
|
||||
verifyNoMoreInteractions(executor);
|
||||
|
||||
when(executor.awaitTermination(1000, TimeUnit.MILLISECONDS)).thenReturn(true);
|
||||
defaultScheduler.shutdown();
|
||||
verify(executor, times(2)).shutdown();
|
||||
verify(executor, times(2)).awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||
verifyNoMoreInteractions(executor);
|
||||
}
|
||||
}
|
|
@ -41,8 +41,6 @@ import org.elasticsearch.xpack.monitoring.exporter.Exporter;
|
|||
import org.joda.time.format.DateTimeFormatter;
|
||||
|
||||
import javax.net.ssl.SSLContext;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collections;
|
||||
|
@ -658,12 +656,12 @@ public class HttpExporter extends Exporter {
|
|||
if (sniffer != null) {
|
||||
sniffer.close();
|
||||
}
|
||||
} catch (IOException | RuntimeException e) {
|
||||
} catch (Exception e) {
|
||||
logger.error("an error occurred while closing the internal client sniffer", e);
|
||||
} finally {
|
||||
try {
|
||||
client.close();
|
||||
} catch (IOException | RuntimeException e) {
|
||||
} catch (Exception e) {
|
||||
logger.error("an error occurred while closing the internal client", e);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -86,7 +86,7 @@ class NodeFailureListener extends RestClient.FailureListener {
|
|||
resource.markDirty();
|
||||
}
|
||||
if (sniffer != null) {
|
||||
sniffer.sniffOnFailure(host);
|
||||
sniffer.sniffOnFailure();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -46,7 +46,7 @@ public class NodeFailureListenerTests extends ESTestCase {
|
|||
|
||||
listener.onFailure(host);
|
||||
|
||||
verify(sniffer).sniffOnFailure(host);
|
||||
verify(sniffer).sniffOnFailure();
|
||||
}
|
||||
|
||||
public void testResourceNotifiedOnFailure() {
|
||||
|
@ -71,7 +71,7 @@ public class NodeFailureListenerTests extends ESTestCase {
|
|||
}
|
||||
|
||||
if (optionalSniffer != null) {
|
||||
verify(sniffer).sniffOnFailure(host);
|
||||
verify(sniffer).sniffOnFailure();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue