Merge branch 'master' into ccr
* master: Deprecates indexing and querying a context completion field without context (#30712) Refactor Sniffer and make it testable (#29638) [Docs] Fix typo in Min Aggregation reference (#30899)
This commit is contained in:
commit
ba78aa8c02
|
@ -61,6 +61,7 @@ import java.util.Comparator;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.HashSet;
|
import java.util.HashSet;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
|
import java.util.LinkedHashSet;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Locale;
|
import java.util.Locale;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
@ -132,7 +133,7 @@ public class RestClient implements Closeable {
|
||||||
if (hosts == null || hosts.length == 0) {
|
if (hosts == null || hosts.length == 0) {
|
||||||
throw new IllegalArgumentException("hosts must not be null nor empty");
|
throw new IllegalArgumentException("hosts must not be null nor empty");
|
||||||
}
|
}
|
||||||
Set<HttpHost> httpHosts = new HashSet<>();
|
Set<HttpHost> httpHosts = new LinkedHashSet<>();
|
||||||
AuthCache authCache = new BasicAuthCache();
|
AuthCache authCache = new BasicAuthCache();
|
||||||
for (HttpHost host : hosts) {
|
for (HttpHost host : hosts) {
|
||||||
Objects.requireNonNull(host, "host cannot be null");
|
Objects.requireNonNull(host, "host cannot be null");
|
||||||
|
@ -143,6 +144,13 @@ public class RestClient implements Closeable {
|
||||||
this.blacklist.clear();
|
this.blacklist.clear();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the configured hosts
|
||||||
|
*/
|
||||||
|
public List<HttpHost> getHosts() {
|
||||||
|
return new ArrayList<>(hostTuple.hosts);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Sends a request to the Elasticsearch cluster that the client points to.
|
* Sends a request to the Elasticsearch cluster that the client points to.
|
||||||
* Blocks until the request is completed and returns its response or fails
|
* Blocks until the request is completed and returns its response or fails
|
||||||
|
|
|
@ -25,6 +25,7 @@ import org.apache.http.impl.nio.client.CloseableHttpAsyncClient;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
@ -251,6 +252,37 @@ public class RestClientTests extends RestClientTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testSetHostsPreservesOrdering() throws Exception {
|
||||||
|
try (RestClient restClient = createRestClient()) {
|
||||||
|
HttpHost[] hosts = randomHosts();
|
||||||
|
restClient.setHosts(hosts);
|
||||||
|
assertEquals(Arrays.asList(hosts), restClient.getHosts());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static HttpHost[] randomHosts() {
|
||||||
|
int numHosts = randomIntBetween(1, 10);
|
||||||
|
HttpHost[] hosts = new HttpHost[numHosts];
|
||||||
|
for (int i = 0; i < hosts.length; i++) {
|
||||||
|
hosts[i] = new HttpHost("host-" + i, 9200);
|
||||||
|
}
|
||||||
|
return hosts;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSetHostsDuplicatedHosts() throws Exception {
|
||||||
|
try (RestClient restClient = createRestClient()) {
|
||||||
|
int numHosts = randomIntBetween(1, 10);
|
||||||
|
HttpHost[] hosts = new HttpHost[numHosts];
|
||||||
|
HttpHost host = new HttpHost("host", 9200);
|
||||||
|
for (int i = 0; i < hosts.length; i++) {
|
||||||
|
hosts[i] = host;
|
||||||
|
}
|
||||||
|
restClient.setHosts(hosts);
|
||||||
|
assertEquals(1, restClient.getHosts().size());
|
||||||
|
assertEquals(host, restClient.getHosts().get(0));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated will remove method in 7.0 but needs tests until then. Replaced by {@link RequestTests#testConstructor()}.
|
* @deprecated will remove method in 7.0 but needs tests until then. Replaced by {@link RequestTests#testConstructor()}.
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -58,7 +58,6 @@ public class SniffOnFailureListener extends RestClient.FailureListener {
|
||||||
if (sniffer == null) {
|
if (sniffer == null) {
|
||||||
throw new IllegalStateException("sniffer was not set, unable to sniff on failure");
|
throw new IllegalStateException("sniffer was not set, unable to sniff on failure");
|
||||||
}
|
}
|
||||||
//re-sniff immediately but take out the node that failed
|
sniffer.sniffOnFailure();
|
||||||
sniffer.sniffOnFailure(host);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -31,12 +31,14 @@ import java.security.AccessController;
|
||||||
import java.security.PrivilegedAction;
|
import java.security.PrivilegedAction;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ScheduledExecutorService;
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
import java.util.concurrent.ScheduledFuture;
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class responsible for sniffing nodes from some source (default is elasticsearch itself) and setting them to a provided instance of
|
* Class responsible for sniffing nodes from some source (default is elasticsearch itself) and setting them to a provided instance of
|
||||||
|
@ -51,101 +53,175 @@ public class Sniffer implements Closeable {
|
||||||
private static final Log logger = LogFactory.getLog(Sniffer.class);
|
private static final Log logger = LogFactory.getLog(Sniffer.class);
|
||||||
private static final String SNIFFER_THREAD_NAME = "es_rest_client_sniffer";
|
private static final String SNIFFER_THREAD_NAME = "es_rest_client_sniffer";
|
||||||
|
|
||||||
private final Task task;
|
private final HostsSniffer hostsSniffer;
|
||||||
|
private final RestClient restClient;
|
||||||
|
private final long sniffIntervalMillis;
|
||||||
|
private final long sniffAfterFailureDelayMillis;
|
||||||
|
private final Scheduler scheduler;
|
||||||
|
private final AtomicBoolean initialized = new AtomicBoolean(false);
|
||||||
|
private volatile ScheduledTask nextScheduledTask;
|
||||||
|
|
||||||
Sniffer(RestClient restClient, HostsSniffer hostsSniffer, long sniffInterval, long sniffAfterFailureDelay) {
|
Sniffer(RestClient restClient, HostsSniffer hostsSniffer, long sniffInterval, long sniffAfterFailureDelay) {
|
||||||
this.task = new Task(hostsSniffer, restClient, sniffInterval, sniffAfterFailureDelay);
|
this(restClient, hostsSniffer, new DefaultScheduler(), sniffInterval, sniffAfterFailureDelay);
|
||||||
|
}
|
||||||
|
|
||||||
|
Sniffer(RestClient restClient, HostsSniffer hostsSniffer, Scheduler scheduler, long sniffInterval, long sniffAfterFailureDelay) {
|
||||||
|
this.hostsSniffer = hostsSniffer;
|
||||||
|
this.restClient = restClient;
|
||||||
|
this.sniffIntervalMillis = sniffInterval;
|
||||||
|
this.sniffAfterFailureDelayMillis = sniffAfterFailureDelay;
|
||||||
|
this.scheduler = scheduler;
|
||||||
|
/*
|
||||||
|
* The first sniffing round is async, so this constructor returns before nextScheduledTask is assigned to a task.
|
||||||
|
* The initialized flag is a protection against NPE due to that.
|
||||||
|
*/
|
||||||
|
Task task = new Task(sniffIntervalMillis) {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
super.run();
|
||||||
|
initialized.compareAndSet(false, true);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
/*
|
||||||
|
* We do not keep track of the returned future as we never intend to cancel the initial sniffing round, we rather
|
||||||
|
* prevent any other operation from being executed till the sniffer is properly initialized
|
||||||
|
*/
|
||||||
|
scheduler.schedule(task, 0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Triggers a new sniffing round and explicitly takes out the failed host provided as argument
|
* Schedule sniffing to run as soon as possible if it isn't already running. Once such sniffing round runs
|
||||||
|
* it will also schedule a new round after sniffAfterFailureDelay ms.
|
||||||
*/
|
*/
|
||||||
public void sniffOnFailure(HttpHost failedHost) {
|
public void sniffOnFailure() {
|
||||||
this.task.sniffOnFailure(failedHost);
|
//sniffOnFailure does nothing until the initial sniffing round has been completed
|
||||||
|
if (initialized.get()) {
|
||||||
|
/*
|
||||||
|
* If sniffing is already running, there is no point in scheduling another round right after the current one.
|
||||||
|
* Concurrent calls may be checking the same task state, but only the first skip call on the same task returns true.
|
||||||
|
* The task may also get replaced while we check its state, in which case calling skip on it returns false.
|
||||||
|
*/
|
||||||
|
if (this.nextScheduledTask.skip()) {
|
||||||
|
/*
|
||||||
|
* We do not keep track of this future as the task will immediately run and we don't intend to cancel it
|
||||||
|
* due to concurrent sniffOnFailure runs. Effectively the previous (now cancelled or skipped) task will stay
|
||||||
|
* assigned to nextTask till this onFailure round gets run and schedules its corresponding afterFailure round.
|
||||||
|
*/
|
||||||
|
scheduler.schedule(new Task(sniffAfterFailureDelayMillis), 0L);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
enum TaskState {
|
||||||
public void close() throws IOException {
|
WAITING, SKIPPED, STARTED
|
||||||
task.shutdown();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class Task implements Runnable {
|
class Task implements Runnable {
|
||||||
private final HostsSniffer hostsSniffer;
|
final long nextTaskDelay;
|
||||||
private final RestClient restClient;
|
final AtomicReference<TaskState> taskState = new AtomicReference<>(TaskState.WAITING);
|
||||||
|
|
||||||
private final long sniffIntervalMillis;
|
Task(long nextTaskDelay) {
|
||||||
private final long sniffAfterFailureDelayMillis;
|
this.nextTaskDelay = nextTaskDelay;
|
||||||
private final ScheduledExecutorService scheduledExecutorService;
|
|
||||||
private final AtomicBoolean running = new AtomicBoolean(false);
|
|
||||||
private ScheduledFuture<?> scheduledFuture;
|
|
||||||
|
|
||||||
private Task(HostsSniffer hostsSniffer, RestClient restClient, long sniffIntervalMillis, long sniffAfterFailureDelayMillis) {
|
|
||||||
this.hostsSniffer = hostsSniffer;
|
|
||||||
this.restClient = restClient;
|
|
||||||
this.sniffIntervalMillis = sniffIntervalMillis;
|
|
||||||
this.sniffAfterFailureDelayMillis = sniffAfterFailureDelayMillis;
|
|
||||||
SnifferThreadFactory threadFactory = new SnifferThreadFactory(SNIFFER_THREAD_NAME);
|
|
||||||
this.scheduledExecutorService = Executors.newScheduledThreadPool(1, threadFactory);
|
|
||||||
scheduleNextRun(0);
|
|
||||||
}
|
|
||||||
|
|
||||||
synchronized void scheduleNextRun(long delayMillis) {
|
|
||||||
if (scheduledExecutorService.isShutdown() == false) {
|
|
||||||
try {
|
|
||||||
if (scheduledFuture != null) {
|
|
||||||
//regardless of when the next sniff is scheduled, cancel it and schedule a new one with updated delay
|
|
||||||
this.scheduledFuture.cancel(false);
|
|
||||||
}
|
|
||||||
logger.debug("scheduling next sniff in " + delayMillis + " ms");
|
|
||||||
this.scheduledFuture = this.scheduledExecutorService.schedule(this, delayMillis, TimeUnit.MILLISECONDS);
|
|
||||||
} catch(Exception e) {
|
|
||||||
logger.error("error while scheduling next sniffer task", e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
sniff(null, sniffIntervalMillis);
|
/*
|
||||||
|
* Skipped or already started tasks do nothing. In most cases tasks will be cancelled and not run, but we want to protect for
|
||||||
|
* cases where future#cancel returns true yet the task runs. We want to make sure that such tasks do nothing otherwise they will
|
||||||
|
* schedule another round at the end and so on, leaving us with multiple parallel sniffing "tracks" whish is undesirable.
|
||||||
|
*/
|
||||||
|
if (taskState.compareAndSet(TaskState.WAITING, TaskState.STARTED) == false) {
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
void sniffOnFailure(HttpHost failedHost) {
|
|
||||||
sniff(failedHost, sniffAfterFailureDelayMillis);
|
|
||||||
}
|
|
||||||
|
|
||||||
void sniff(HttpHost excludeHost, long nextSniffDelayMillis) {
|
|
||||||
if (running.compareAndSet(false, true)) {
|
|
||||||
try {
|
try {
|
||||||
|
sniff();
|
||||||
|
} catch (Exception e) {
|
||||||
|
logger.error("error while sniffing nodes", e);
|
||||||
|
} finally {
|
||||||
|
Task task = new Task(sniffIntervalMillis);
|
||||||
|
Future<?> future = scheduler.schedule(task, nextTaskDelay);
|
||||||
|
//tasks are run by a single threaded executor, so swapping is safe with a simple volatile variable
|
||||||
|
ScheduledTask previousTask = nextScheduledTask;
|
||||||
|
nextScheduledTask = new ScheduledTask(task, future);
|
||||||
|
assert initialized.get() == false ||
|
||||||
|
previousTask.task.isSkipped() || previousTask.task.hasStarted() : "task that we are replacing is neither " +
|
||||||
|
"cancelled nor has it ever started";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if the task has started, false in case it didn't start (yet?) or it was skipped
|
||||||
|
*/
|
||||||
|
boolean hasStarted() {
|
||||||
|
return taskState.get() == TaskState.STARTED;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Sets this task to be skipped. Returns true if the task will be skipped, false if the task has already started.
|
||||||
|
*/
|
||||||
|
boolean skip() {
|
||||||
|
/*
|
||||||
|
* Threads may still get run although future#cancel returns true. We make sure that a task is either cancelled (or skipped),
|
||||||
|
* or entirely run. In the odd case that future#cancel returns true and the thread still runs, the task won't do anything.
|
||||||
|
* In case future#cancel returns true but the task has already started, this state change will not succeed hence this method
|
||||||
|
* returns false and the task will normally run.
|
||||||
|
*/
|
||||||
|
return taskState.compareAndSet(TaskState.WAITING, TaskState.SKIPPED);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns true if the task was set to be skipped before it was started
|
||||||
|
*/
|
||||||
|
boolean isSkipped() {
|
||||||
|
return taskState.get() == TaskState.SKIPPED;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static final class ScheduledTask {
|
||||||
|
final Task task;
|
||||||
|
final Future<?> future;
|
||||||
|
|
||||||
|
ScheduledTask(Task task, Future<?> future) {
|
||||||
|
this.task = task;
|
||||||
|
this.future = future;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Cancels this task. Returns true if the task has been successfully cancelled, meaning it won't be executed
|
||||||
|
* or if it is its execution won't have any effect. Returns false if the task cannot be cancelled (possibly it was
|
||||||
|
* already cancelled or already completed).
|
||||||
|
*/
|
||||||
|
boolean skip() {
|
||||||
|
/*
|
||||||
|
* Future#cancel should return false whenever a task cannot be cancelled, most likely as it has already started. We don't
|
||||||
|
* trust it much though so we try to cancel hoping that it will work. At the same time we always call skip too, which means
|
||||||
|
* that if the task has already started the state change will fail. We could potentially not call skip when cancel returns
|
||||||
|
* false but we prefer to stay on the safe side.
|
||||||
|
*/
|
||||||
|
future.cancel(false);
|
||||||
|
return task.skip();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
final void sniff() throws IOException {
|
||||||
List<HttpHost> sniffedHosts = hostsSniffer.sniffHosts();
|
List<HttpHost> sniffedHosts = hostsSniffer.sniffHosts();
|
||||||
|
if (logger.isDebugEnabled()) {
|
||||||
logger.debug("sniffed hosts: " + sniffedHosts);
|
logger.debug("sniffed hosts: " + sniffedHosts);
|
||||||
if (excludeHost != null) {
|
|
||||||
sniffedHosts.remove(excludeHost);
|
|
||||||
}
|
}
|
||||||
if (sniffedHosts.isEmpty()) {
|
if (sniffedHosts.isEmpty()) {
|
||||||
logger.warn("no hosts to set, hosts will be updated at the next sniffing round");
|
logger.warn("no hosts to set, hosts will be updated at the next sniffing round");
|
||||||
} else {
|
} else {
|
||||||
this.restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()]));
|
restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()]));
|
||||||
}
|
|
||||||
} catch (Exception e) {
|
|
||||||
logger.error("error while sniffing nodes", e);
|
|
||||||
} finally {
|
|
||||||
scheduleNextRun(nextSniffDelayMillis);
|
|
||||||
running.set(false);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized void shutdown() {
|
@Override
|
||||||
scheduledExecutorService.shutdown();
|
public void close() {
|
||||||
try {
|
if (initialized.get()) {
|
||||||
if (scheduledExecutorService.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
|
nextScheduledTask.skip();
|
||||||
return;
|
|
||||||
}
|
|
||||||
scheduledExecutorService.shutdownNow();
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
this.scheduler.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -158,8 +234,62 @@ public class Sniffer implements Closeable {
|
||||||
return new SnifferBuilder(restClient);
|
return new SnifferBuilder(restClient);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class SnifferThreadFactory implements ThreadFactory {
|
/**
|
||||||
|
* The Scheduler interface allows to isolate the sniffing scheduling aspects so that we can test
|
||||||
|
* the sniffer by injecting when needed a custom scheduler that is more suited for testing.
|
||||||
|
*/
|
||||||
|
interface Scheduler {
|
||||||
|
/**
|
||||||
|
* Schedules the provided {@link Runnable} to be executed in <code>delayMillis</code> milliseconds
|
||||||
|
*/
|
||||||
|
Future<?> schedule(Task task, long delayMillis);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shuts this scheduler down
|
||||||
|
*/
|
||||||
|
void shutdown();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Default implementation of {@link Scheduler}, based on {@link ScheduledExecutorService}
|
||||||
|
*/
|
||||||
|
static final class DefaultScheduler implements Scheduler {
|
||||||
|
final ScheduledExecutorService executor;
|
||||||
|
|
||||||
|
DefaultScheduler() {
|
||||||
|
this(initScheduledExecutorService());
|
||||||
|
}
|
||||||
|
|
||||||
|
DefaultScheduler(ScheduledExecutorService executor) {
|
||||||
|
this.executor = executor;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ScheduledExecutorService initScheduledExecutorService() {
|
||||||
|
ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, new SnifferThreadFactory(SNIFFER_THREAD_NAME));
|
||||||
|
executor.setRemoveOnCancelPolicy(true);
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Future<?> schedule(Task task, long delayMillis) {
|
||||||
|
return executor.schedule(task, delayMillis, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
executor.shutdown();
|
||||||
|
try {
|
||||||
|
if (executor.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
executor.shutdownNow();
|
||||||
|
} catch (InterruptedException ignore) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class SnifferThreadFactory implements ThreadFactory {
|
||||||
private final AtomicInteger threadNumber = new AtomicInteger(1);
|
private final AtomicInteger threadNumber = new AtomicInteger(1);
|
||||||
private final String namePrefix;
|
private final String namePrefix;
|
||||||
private final ThreadFactory originalThreadFactory;
|
private final ThreadFactory originalThreadFactory;
|
||||||
|
|
|
@ -21,7 +21,6 @@ package org.elasticsearch.client.sniff;
|
||||||
|
|
||||||
import org.apache.http.HttpHost;
|
import org.apache.http.HttpHost;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
|
@ -30,7 +29,7 @@ import java.util.List;
|
||||||
*/
|
*/
|
||||||
class MockHostsSniffer implements HostsSniffer {
|
class MockHostsSniffer implements HostsSniffer {
|
||||||
@Override
|
@Override
|
||||||
public List<HttpHost> sniffHosts() throws IOException {
|
public List<HttpHost> sniffHosts() {
|
||||||
return Collections.singletonList(new HttpHost("localhost", 9200));
|
return Collections.singletonList(new HttpHost("localhost", 9200));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,656 @@
|
||||||
|
/*
|
||||||
|
* Licensed to Elasticsearch under one or more contributor
|
||||||
|
* license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright
|
||||||
|
* ownership. Elasticsearch licenses this file to you under
|
||||||
|
* the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
* not use this file except in compliance with the License.
|
||||||
|
* You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing,
|
||||||
|
* software distributed under the License is distributed on an
|
||||||
|
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||||
|
* KIND, either express or implied. See the License for the
|
||||||
|
* specific language governing permissions and limitations
|
||||||
|
* under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.elasticsearch.client.sniff;
|
||||||
|
|
||||||
|
import org.apache.http.HttpHost;
|
||||||
|
import org.elasticsearch.client.RestClient;
|
||||||
|
import org.elasticsearch.client.RestClientTestCase;
|
||||||
|
import org.elasticsearch.client.sniff.Sniffer.DefaultScheduler;
|
||||||
|
import org.elasticsearch.client.sniff.Sniffer.Scheduler;
|
||||||
|
import org.mockito.Matchers;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.CancellationException;
|
||||||
|
import java.util.concurrent.CopyOnWriteArraySet;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ScheduledExecutorService;
|
||||||
|
import java.util.concurrent.ScheduledFuture;
|
||||||
|
import java.util.concurrent.ScheduledThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import static org.hamcrest.CoreMatchers.equalTo;
|
||||||
|
import static org.hamcrest.CoreMatchers.instanceOf;
|
||||||
|
import static org.hamcrest.Matchers.greaterThan;
|
||||||
|
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertSame;
|
||||||
|
import static org.junit.Assert.assertThat;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.junit.Assert.fail;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class SnifferTests extends RestClientTestCase {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests the {@link Sniffer#sniff()} method in isolation. Verifies that it uses the {@link HostsSniffer} implementation
|
||||||
|
* to retrieve nodes and set them (when not empty) to the provided {@link RestClient} instance.
|
||||||
|
*/
|
||||||
|
public void testSniff() throws IOException {
|
||||||
|
HttpHost initialHost = new HttpHost("localhost", 9200);
|
||||||
|
try (RestClient restClient = RestClient.builder(initialHost).build()) {
|
||||||
|
Scheduler noOpScheduler = new Scheduler() {
|
||||||
|
@Override
|
||||||
|
public Future<?> schedule(Sniffer.Task task, long delayMillis) {
|
||||||
|
return mock(Future.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
|
||||||
|
}
|
||||||
|
};
|
||||||
|
CountingHostsSniffer hostsSniffer = new CountingHostsSniffer();
|
||||||
|
int iters = randomIntBetween(5, 30);
|
||||||
|
try (Sniffer sniffer = new Sniffer(restClient, hostsSniffer, noOpScheduler, 1000L, -1)){
|
||||||
|
{
|
||||||
|
assertEquals(1, restClient.getHosts().size());
|
||||||
|
HttpHost httpHost = restClient.getHosts().get(0);
|
||||||
|
assertEquals("localhost", httpHost.getHostName());
|
||||||
|
assertEquals(9200, httpHost.getPort());
|
||||||
|
}
|
||||||
|
int emptyList = 0;
|
||||||
|
int failures = 0;
|
||||||
|
int runs = 0;
|
||||||
|
List<HttpHost> lastHosts = Collections.singletonList(initialHost);
|
||||||
|
for (int i = 0; i < iters; i++) {
|
||||||
|
try {
|
||||||
|
runs++;
|
||||||
|
sniffer.sniff();
|
||||||
|
if (hostsSniffer.failures.get() > failures) {
|
||||||
|
failures++;
|
||||||
|
fail("should have failed given that hostsSniffer says it threw an exception");
|
||||||
|
} else if (hostsSniffer.emptyList.get() > emptyList) {
|
||||||
|
emptyList++;
|
||||||
|
assertEquals(lastHosts, restClient.getHosts());
|
||||||
|
} else {
|
||||||
|
assertNotEquals(lastHosts, restClient.getHosts());
|
||||||
|
List<HttpHost> expectedHosts = CountingHostsSniffer.buildHosts(runs);
|
||||||
|
assertEquals(expectedHosts, restClient.getHosts());
|
||||||
|
lastHosts = restClient.getHosts();
|
||||||
|
}
|
||||||
|
} catch(IOException e) {
|
||||||
|
if (hostsSniffer.failures.get() > failures) {
|
||||||
|
failures++;
|
||||||
|
assertEquals("communication breakdown", e.getMessage());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(hostsSniffer.emptyList.get(), emptyList);
|
||||||
|
assertEquals(hostsSniffer.failures.get(), failures);
|
||||||
|
assertEquals(hostsSniffer.runs.get(), runs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test multiple sniffing rounds by mocking the {@link Scheduler} as well as the {@link HostsSniffer}.
|
||||||
|
* Simulates the ordinary behaviour of {@link Sniffer} when sniffing on failure is not enabled.
|
||||||
|
* The {@link CountingHostsSniffer} doesn't make any network connection but may throw exception or return no hosts, which makes
|
||||||
|
* it possible to verify that errors are properly handled and don't affect subsequent runs and their scheduling.
|
||||||
|
* The {@link Scheduler} implementation submits rather than scheduling tasks, meaning that it doesn't respect the requested sniff
|
||||||
|
* delays while allowing to assert that the requested delays for each requested run and the following one are the expected values.
|
||||||
|
*/
|
||||||
|
public void testOrdinarySniffRounds() throws Exception {
|
||||||
|
final long sniffInterval = randomLongBetween(1, Long.MAX_VALUE);
|
||||||
|
long sniffAfterFailureDelay = randomLongBetween(1, Long.MAX_VALUE);
|
||||||
|
RestClient restClient = mock(RestClient.class);
|
||||||
|
CountingHostsSniffer hostsSniffer = new CountingHostsSniffer();
|
||||||
|
final int iters = randomIntBetween(30, 100);
|
||||||
|
final Set<Future<?>> futures = new CopyOnWriteArraySet<>();
|
||||||
|
final CountDownLatch completionLatch = new CountDownLatch(1);
|
||||||
|
final AtomicInteger runs = new AtomicInteger(iters);
|
||||||
|
final ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||||
|
final AtomicReference<Future<?>> lastFuture = new AtomicReference<>();
|
||||||
|
final AtomicReference<Sniffer.Task> lastTask = new AtomicReference<>();
|
||||||
|
Scheduler scheduler = new Scheduler() {
|
||||||
|
@Override
|
||||||
|
public Future<?> schedule(Sniffer.Task task, long delayMillis) {
|
||||||
|
assertEquals(sniffInterval, task.nextTaskDelay);
|
||||||
|
int numberOfRuns = runs.getAndDecrement();
|
||||||
|
if (numberOfRuns == iters) {
|
||||||
|
//the first call is to schedule the first sniff round from the Sniffer constructor, with delay O
|
||||||
|
assertEquals(0L, delayMillis);
|
||||||
|
assertEquals(sniffInterval, task.nextTaskDelay);
|
||||||
|
} else {
|
||||||
|
//all of the subsequent times "schedule" is called with delay set to the configured sniff interval
|
||||||
|
assertEquals(sniffInterval, delayMillis);
|
||||||
|
assertEquals(sniffInterval, task.nextTaskDelay);
|
||||||
|
if (numberOfRuns == 0) {
|
||||||
|
completionLatch.countDown();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//we submit rather than scheduling to make the test quick and not depend on time
|
||||||
|
Future<?> future = executor.submit(task);
|
||||||
|
futures.add(future);
|
||||||
|
if (numberOfRuns == 1) {
|
||||||
|
lastFuture.set(future);
|
||||||
|
lastTask.set(task);
|
||||||
|
}
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
//the executor is closed externally, shutdown is tested separately
|
||||||
|
}
|
||||||
|
};
|
||||||
|
try {
|
||||||
|
new Sniffer(restClient, hostsSniffer, scheduler, sniffInterval, sniffAfterFailureDelay);
|
||||||
|
assertTrue("timeout waiting for sniffing rounds to be completed", completionLatch.await(1000, TimeUnit.MILLISECONDS));
|
||||||
|
assertEquals(iters, futures.size());
|
||||||
|
//the last future is the only one that may not be completed yet, as the count down happens
|
||||||
|
//while scheduling the next round which is still part of the execution of the runnable itself.
|
||||||
|
assertTrue(lastTask.get().hasStarted());
|
||||||
|
lastFuture.get().get();
|
||||||
|
for (Future<?> future : futures) {
|
||||||
|
assertTrue(future.isDone());
|
||||||
|
future.get();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
executor.shutdown();
|
||||||
|
assertTrue(executor.awaitTermination(1000, TimeUnit.MILLISECONDS));
|
||||||
|
}
|
||||||
|
int totalRuns = hostsSniffer.runs.get();
|
||||||
|
assertEquals(iters, totalRuns);
|
||||||
|
int setHostsRuns = totalRuns - hostsSniffer.failures.get() - hostsSniffer.emptyList.get();
|
||||||
|
verify(restClient, times(setHostsRuns)).setHosts(Matchers.<HttpHost>anyVararg());
|
||||||
|
verifyNoMoreInteractions(restClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that {@link Sniffer#close()} shuts down the underlying {@link Scheduler}, and that such calls are idempotent.
|
||||||
|
* Also verifies that the next scheduled round gets cancelled.
|
||||||
|
*/
|
||||||
|
public void testClose() {
|
||||||
|
final Future<?> future = mock(Future.class);
|
||||||
|
long sniffInterval = randomLongBetween(1, Long.MAX_VALUE);
|
||||||
|
long sniffAfterFailureDelay = randomLongBetween(1, Long.MAX_VALUE);
|
||||||
|
RestClient restClient = mock(RestClient.class);
|
||||||
|
final AtomicInteger shutdown = new AtomicInteger(0);
|
||||||
|
final AtomicBoolean initialized = new AtomicBoolean(false);
|
||||||
|
Scheduler scheduler = new Scheduler() {
|
||||||
|
@Override
|
||||||
|
public Future<?> schedule(Sniffer.Task task, long delayMillis) {
|
||||||
|
if (initialized.compareAndSet(false, true)) {
|
||||||
|
//run from the same thread so the sniffer gets for sure initialized and the scheduled task gets cancelled on close
|
||||||
|
task.run();
|
||||||
|
}
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
shutdown.incrementAndGet();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Sniffer sniffer = new Sniffer(restClient, new MockHostsSniffer(), scheduler, sniffInterval, sniffAfterFailureDelay);
|
||||||
|
assertEquals(0, shutdown.get());
|
||||||
|
int iters = randomIntBetween(3, 10);
|
||||||
|
for (int i = 1; i <= iters; i++) {
|
||||||
|
sniffer.close();
|
||||||
|
verify(future, times(i)).cancel(false);
|
||||||
|
assertEquals(i, shutdown.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testSniffOnFailureNotInitialized() {
|
||||||
|
RestClient restClient = mock(RestClient.class);
|
||||||
|
CountingHostsSniffer hostsSniffer = new CountingHostsSniffer();
|
||||||
|
long sniffInterval = randomLongBetween(1, Long.MAX_VALUE);
|
||||||
|
long sniffAfterFailureDelay = randomLongBetween(1, Long.MAX_VALUE);
|
||||||
|
final AtomicInteger scheduleCalls = new AtomicInteger(0);
|
||||||
|
Scheduler scheduler = new Scheduler() {
|
||||||
|
@Override
|
||||||
|
public Future<?> schedule(Sniffer.Task task, long delayMillis) {
|
||||||
|
scheduleCalls.incrementAndGet();
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
Sniffer sniffer = new Sniffer(restClient, hostsSniffer, scheduler, sniffInterval, sniffAfterFailureDelay);
|
||||||
|
for (int i = 0; i < 10; i++) {
|
||||||
|
sniffer.sniffOnFailure();
|
||||||
|
}
|
||||||
|
assertEquals(1, scheduleCalls.get());
|
||||||
|
int totalRuns = hostsSniffer.runs.get();
|
||||||
|
assertEquals(0, totalRuns);
|
||||||
|
int setHostsRuns = totalRuns - hostsSniffer.failures.get() - hostsSniffer.emptyList.get();
|
||||||
|
verify(restClient, times(setHostsRuns)).setHosts(Matchers.<HttpHost>anyVararg());
|
||||||
|
verifyNoMoreInteractions(restClient);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test behaviour when a bunch of onFailure sniffing rounds are triggered in parallel. Each run will always
|
||||||
|
* schedule a subsequent afterFailure round. Also, for each onFailure round that starts, the net scheduled round
|
||||||
|
* (either afterFailure or ordinary) gets cancelled.
|
||||||
|
*/
|
||||||
|
public void testSniffOnFailure() throws Exception {
|
||||||
|
RestClient restClient = mock(RestClient.class);
|
||||||
|
CountingHostsSniffer hostsSniffer = new CountingHostsSniffer();
|
||||||
|
final AtomicBoolean initializing = new AtomicBoolean(true);
|
||||||
|
final long sniffInterval = randomLongBetween(1, Long.MAX_VALUE);
|
||||||
|
final long sniffAfterFailureDelay = randomLongBetween(1, Long.MAX_VALUE);
|
||||||
|
int minNumOnFailureRounds = randomIntBetween(5, 10);
|
||||||
|
final CountDownLatch initializingLatch = new CountDownLatch(1);
|
||||||
|
final Set<Sniffer.ScheduledTask> ordinaryRoundsTasks = new CopyOnWriteArraySet<>();
|
||||||
|
final AtomicReference<Future<?>> initializingFuture = new AtomicReference<>();
|
||||||
|
final Set<Sniffer.ScheduledTask> onFailureTasks = new CopyOnWriteArraySet<>();
|
||||||
|
final Set<Sniffer.ScheduledTask> afterFailureTasks = new CopyOnWriteArraySet<>();
|
||||||
|
final AtomicBoolean onFailureCompleted = new AtomicBoolean(false);
|
||||||
|
final CountDownLatch completionLatch = new CountDownLatch(1);
|
||||||
|
final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
try {
|
||||||
|
Scheduler scheduler = new Scheduler() {
|
||||||
|
@Override
|
||||||
|
public Future<?> schedule(final Sniffer.Task task, long delayMillis) {
|
||||||
|
if (initializing.compareAndSet(true, false)) {
|
||||||
|
assertEquals(0L, delayMillis);
|
||||||
|
Future<?> future = executor.submit(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
task.run();
|
||||||
|
} finally {
|
||||||
|
//we need to make sure that the sniffer is initialized, so the sniffOnFailure
|
||||||
|
//call does what it needs to do. Otherwise nothing happens until initialized.
|
||||||
|
initializingLatch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertTrue(initializingFuture.compareAndSet(null, future));
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
if (delayMillis == 0L) {
|
||||||
|
Future<?> future = executor.submit(task);
|
||||||
|
onFailureTasks.add(new Sniffer.ScheduledTask(task, future));
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
if (delayMillis == sniffAfterFailureDelay) {
|
||||||
|
Future<?> future = scheduleOrSubmit(task);
|
||||||
|
afterFailureTasks.add(new Sniffer.ScheduledTask(task, future));
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
assertEquals(sniffInterval, delayMillis);
|
||||||
|
assertEquals(sniffInterval, task.nextTaskDelay);
|
||||||
|
|
||||||
|
if (onFailureCompleted.get() && onFailureTasks.size() == afterFailureTasks.size()) {
|
||||||
|
completionLatch.countDown();
|
||||||
|
return mock(Future.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
Future<?> future = scheduleOrSubmit(task);
|
||||||
|
ordinaryRoundsTasks.add(new Sniffer.ScheduledTask(task, future));
|
||||||
|
return future;
|
||||||
|
}
|
||||||
|
|
||||||
|
private Future<?> scheduleOrSubmit(Sniffer.Task task) {
|
||||||
|
if (randomBoolean()) {
|
||||||
|
return executor.schedule(task, randomLongBetween(0L, 200L), TimeUnit.MILLISECONDS);
|
||||||
|
} else {
|
||||||
|
return executor.submit(task);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
final Sniffer sniffer = new Sniffer(restClient, hostsSniffer, scheduler, sniffInterval, sniffAfterFailureDelay);
|
||||||
|
assertTrue("timeout waiting for sniffer to get initialized", initializingLatch.await(1000, TimeUnit.MILLISECONDS));
|
||||||
|
|
||||||
|
ExecutorService onFailureExecutor = Executors.newFixedThreadPool(randomIntBetween(5, 20));
|
||||||
|
Set<Future<?>> onFailureFutures = new CopyOnWriteArraySet<>();
|
||||||
|
try {
|
||||||
|
//with tasks executing quickly one after each other, it is very likely that the onFailure round gets skipped
|
||||||
|
//as another round is already running. We retry till enough runs get through as that's what we want to test.
|
||||||
|
while (onFailureTasks.size() < minNumOnFailureRounds) {
|
||||||
|
onFailureFutures.add(onFailureExecutor.submit(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
sniffer.sniffOnFailure();
|
||||||
|
}
|
||||||
|
}));
|
||||||
|
}
|
||||||
|
assertThat(onFailureFutures.size(), greaterThanOrEqualTo(minNumOnFailureRounds));
|
||||||
|
for (Future<?> onFailureFuture : onFailureFutures) {
|
||||||
|
assertNull(onFailureFuture.get());
|
||||||
|
}
|
||||||
|
onFailureCompleted.set(true);
|
||||||
|
} finally {
|
||||||
|
onFailureExecutor.shutdown();
|
||||||
|
onFailureExecutor.awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
assertFalse(initializingFuture.get().isCancelled());
|
||||||
|
assertTrue(initializingFuture.get().isDone());
|
||||||
|
assertNull(initializingFuture.get().get());
|
||||||
|
|
||||||
|
assertTrue("timeout waiting for sniffing rounds to be completed", completionLatch.await(1000, TimeUnit.MILLISECONDS));
|
||||||
|
assertThat(onFailureTasks.size(), greaterThanOrEqualTo(minNumOnFailureRounds));
|
||||||
|
assertEquals(onFailureTasks.size(), afterFailureTasks.size());
|
||||||
|
|
||||||
|
for (Sniffer.ScheduledTask onFailureTask : onFailureTasks) {
|
||||||
|
assertFalse(onFailureTask.future.isCancelled());
|
||||||
|
assertTrue(onFailureTask.future.isDone());
|
||||||
|
assertNull(onFailureTask.future.get());
|
||||||
|
assertTrue(onFailureTask.task.hasStarted());
|
||||||
|
assertFalse(onFailureTask.task.isSkipped());
|
||||||
|
}
|
||||||
|
|
||||||
|
int cancelledTasks = 0;
|
||||||
|
int completedTasks = onFailureTasks.size() + 1;
|
||||||
|
for (Sniffer.ScheduledTask afterFailureTask : afterFailureTasks) {
|
||||||
|
if (assertTaskCancelledOrCompleted(afterFailureTask)) {
|
||||||
|
completedTasks++;
|
||||||
|
} else {
|
||||||
|
cancelledTasks++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
assertThat(ordinaryRoundsTasks.size(), greaterThan(0));
|
||||||
|
for (Sniffer.ScheduledTask task : ordinaryRoundsTasks) {
|
||||||
|
if (assertTaskCancelledOrCompleted(task)) {
|
||||||
|
completedTasks++;
|
||||||
|
} else {
|
||||||
|
cancelledTasks++;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
assertEquals(onFailureTasks.size(), cancelledTasks);
|
||||||
|
|
||||||
|
assertEquals(completedTasks, hostsSniffer.runs.get());
|
||||||
|
int setHostsRuns = hostsSniffer.runs.get() - hostsSniffer.failures.get() - hostsSniffer.emptyList.get();
|
||||||
|
verify(restClient, times(setHostsRuns)).setHosts(Matchers.<HttpHost>anyVararg());
|
||||||
|
verifyNoMoreInteractions(restClient);
|
||||||
|
} finally {
|
||||||
|
executor.shutdown();
|
||||||
|
executor.awaitTermination(1000L, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static boolean assertTaskCancelledOrCompleted(Sniffer.ScheduledTask task) throws ExecutionException, InterruptedException {
|
||||||
|
if (task.task.isSkipped()) {
|
||||||
|
assertTrue(task.future.isCancelled());
|
||||||
|
try {
|
||||||
|
task.future.get();
|
||||||
|
fail("cancellation exception should have been thrown");
|
||||||
|
} catch(CancellationException ignore) {
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
} else {
|
||||||
|
try {
|
||||||
|
assertNull(task.future.get());
|
||||||
|
} catch(CancellationException ignore) {
|
||||||
|
assertTrue(task.future.isCancelled());
|
||||||
|
}
|
||||||
|
assertTrue(task.future.isDone());
|
||||||
|
assertTrue(task.task.hasStarted());
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testTaskCancelling() throws Exception {
|
||||||
|
RestClient restClient = mock(RestClient.class);
|
||||||
|
HostsSniffer hostsSniffer = mock(HostsSniffer.class);
|
||||||
|
Scheduler noOpScheduler = new Scheduler() {
|
||||||
|
@Override
|
||||||
|
public Future<?> schedule(Sniffer.Task task, long delayMillis) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Sniffer sniffer = new Sniffer(restClient, hostsSniffer, noOpScheduler, 0L, 0L);
|
||||||
|
ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();
|
||||||
|
try {
|
||||||
|
int numIters = randomIntBetween(50, 100);
|
||||||
|
for (int i = 0; i < numIters; i++) {
|
||||||
|
Sniffer.Task task = sniffer.new Task(0L);
|
||||||
|
TaskWrapper wrapper = new TaskWrapper(task);
|
||||||
|
Future<?> future;
|
||||||
|
if (rarely()) {
|
||||||
|
future = executor.schedule(wrapper, randomLongBetween(0L, 200L), TimeUnit.MILLISECONDS);
|
||||||
|
} else {
|
||||||
|
future = executor.submit(wrapper);
|
||||||
|
}
|
||||||
|
Sniffer.ScheduledTask scheduledTask = new Sniffer.ScheduledTask(task, future);
|
||||||
|
boolean skip = scheduledTask.skip();
|
||||||
|
try {
|
||||||
|
assertNull(future.get());
|
||||||
|
} catch(CancellationException ignore) {
|
||||||
|
assertTrue(future.isCancelled());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (skip) {
|
||||||
|
//the task was either cancelled before starting, in which case it will never start (thanks to Future#cancel),
|
||||||
|
//or skipped, in which case it will run but do nothing (thanks to Task#skip).
|
||||||
|
//Here we want to make sure that whenever skip returns true, the task either won't run or it won't do anything,
|
||||||
|
//otherwise we may end up with parallel sniffing tracks given that each task schedules the following one. We need to
|
||||||
|
// make sure that onFailure takes scheduling over while at the same time ordinary rounds don't go on.
|
||||||
|
assertFalse(task.hasStarted());
|
||||||
|
assertTrue(task.isSkipped());
|
||||||
|
assertTrue(future.isCancelled());
|
||||||
|
assertTrue(future.isDone());
|
||||||
|
} else {
|
||||||
|
//if a future is cancelled when its execution has already started, future#get throws CancellationException before
|
||||||
|
//completion. The execution continues though so we use a latch to try and wait for the task to be completed.
|
||||||
|
//Here we want to make sure that whenever skip returns false, the task will be completed, otherwise we may be
|
||||||
|
//missing to schedule the following round, which means no sniffing will ever happen again besides on failure sniffing.
|
||||||
|
assertTrue(wrapper.await());
|
||||||
|
//the future may or may not be cancelled but the task has for sure started and completed
|
||||||
|
assertTrue(task.toString(), task.hasStarted());
|
||||||
|
assertFalse(task.isSkipped());
|
||||||
|
assertTrue(future.isDone());
|
||||||
|
}
|
||||||
|
//subsequent cancel calls return false for sure
|
||||||
|
int cancelCalls = randomIntBetween(1, 10);
|
||||||
|
for (int j = 0; j < cancelCalls; j++) {
|
||||||
|
assertFalse(scheduledTask.skip());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
executor.shutdown();
|
||||||
|
executor.awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wraps a {@link Sniffer.Task} and allows to wait for its completion. This is needed to verify
|
||||||
|
* that tasks are either never started or always completed. Calling {@link Future#get()} against a cancelled future will
|
||||||
|
* throw {@link CancellationException} straight-away but the execution of the task will continue if it had already started,
|
||||||
|
* in which case {@link Future#cancel(boolean)} returns true which is not very helpful.
|
||||||
|
*/
|
||||||
|
private static final class TaskWrapper implements Runnable {
|
||||||
|
final Sniffer.Task task;
|
||||||
|
final CountDownLatch completionLatch = new CountDownLatch(1);
|
||||||
|
|
||||||
|
TaskWrapper(Sniffer.Task task) {
|
||||||
|
this.task = task;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
task.run();
|
||||||
|
} finally {
|
||||||
|
completionLatch.countDown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
boolean await() throws InterruptedException {
|
||||||
|
return completionLatch.await(1000, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Mock {@link HostsSniffer} implementation used for testing, which most of the times return a fixed host.
|
||||||
|
* It rarely throws exception or return an empty list of hosts, to make sure that such situations are properly handled.
|
||||||
|
* It also asserts that it never gets called concurrently, based on the assumption that only one sniff run can be run
|
||||||
|
* at a given point in time.
|
||||||
|
*/
|
||||||
|
private static class CountingHostsSniffer implements HostsSniffer {
|
||||||
|
private final AtomicInteger runs = new AtomicInteger(0);
|
||||||
|
private final AtomicInteger failures = new AtomicInteger(0);
|
||||||
|
private final AtomicInteger emptyList = new AtomicInteger(0);
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<HttpHost> sniffHosts() throws IOException {
|
||||||
|
int run = runs.incrementAndGet();
|
||||||
|
if (rarely()) {
|
||||||
|
failures.incrementAndGet();
|
||||||
|
//check that if communication breaks, sniffer keeps on working
|
||||||
|
throw new IOException("communication breakdown");
|
||||||
|
}
|
||||||
|
if (rarely()) {
|
||||||
|
emptyList.incrementAndGet();
|
||||||
|
return Collections.emptyList();
|
||||||
|
}
|
||||||
|
return buildHosts(run);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<HttpHost> buildHosts(int run) {
|
||||||
|
int size = run % 5 + 1;
|
||||||
|
assert size > 0;
|
||||||
|
List<HttpHost> hosts = new ArrayList<>(size);
|
||||||
|
for (int i = 0; i < size; i++) {
|
||||||
|
hosts.add(new HttpHost("sniffed-" + run, 9200 + i));
|
||||||
|
}
|
||||||
|
return hosts;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
public void testDefaultSchedulerSchedule() {
|
||||||
|
RestClient restClient = mock(RestClient.class);
|
||||||
|
HostsSniffer hostsSniffer = mock(HostsSniffer.class);
|
||||||
|
Scheduler noOpScheduler = new Scheduler() {
|
||||||
|
@Override
|
||||||
|
public Future<?> schedule(Sniffer.Task task, long delayMillis) {
|
||||||
|
return mock(Future.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void shutdown() {
|
||||||
|
|
||||||
|
}
|
||||||
|
};
|
||||||
|
Sniffer sniffer = new Sniffer(restClient, hostsSniffer, noOpScheduler, 0L, 0L);
|
||||||
|
Sniffer.Task task = sniffer.new Task(randomLongBetween(1, Long.MAX_VALUE));
|
||||||
|
|
||||||
|
ScheduledExecutorService scheduledExecutorService = mock(ScheduledExecutorService.class);
|
||||||
|
final ScheduledFuture<?> mockedFuture = mock(ScheduledFuture.class);
|
||||||
|
when(scheduledExecutorService.schedule(any(Runnable.class), any(Long.class), any(TimeUnit.class)))
|
||||||
|
.then(new Answer<ScheduledFuture<?>>() {
|
||||||
|
@Override
|
||||||
|
public ScheduledFuture<?> answer(InvocationOnMock invocationOnMock) {
|
||||||
|
return mockedFuture;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
DefaultScheduler scheduler = new DefaultScheduler(scheduledExecutorService);
|
||||||
|
long delay = randomLongBetween(1, Long.MAX_VALUE);
|
||||||
|
Future<?> future = scheduler.schedule(task, delay);
|
||||||
|
assertSame(mockedFuture, future);
|
||||||
|
verify(scheduledExecutorService).schedule(task, delay, TimeUnit.MILLISECONDS);
|
||||||
|
verifyNoMoreInteractions(scheduledExecutorService, mockedFuture);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDefaultSchedulerThreadFactory() {
|
||||||
|
DefaultScheduler defaultScheduler = new DefaultScheduler();
|
||||||
|
try {
|
||||||
|
ScheduledExecutorService executorService = defaultScheduler.executor;
|
||||||
|
assertThat(executorService, instanceOf(ScheduledThreadPoolExecutor.class));
|
||||||
|
assertThat(executorService, instanceOf(ScheduledThreadPoolExecutor.class));
|
||||||
|
ScheduledThreadPoolExecutor executor = (ScheduledThreadPoolExecutor) executorService;
|
||||||
|
assertTrue(executor.getRemoveOnCancelPolicy());
|
||||||
|
assertFalse(executor.getContinueExistingPeriodicTasksAfterShutdownPolicy());
|
||||||
|
assertTrue(executor.getExecuteExistingDelayedTasksAfterShutdownPolicy());
|
||||||
|
assertThat(executor.getThreadFactory(), instanceOf(Sniffer.SnifferThreadFactory.class));
|
||||||
|
int iters = randomIntBetween(3, 10);
|
||||||
|
for (int i = 1; i <= iters; i++) {
|
||||||
|
Thread thread = executor.getThreadFactory().newThread(new Runnable() {
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
|
||||||
|
}
|
||||||
|
});
|
||||||
|
assertThat(thread.getName(), equalTo("es_rest_client_sniffer[T#" + i + "]"));
|
||||||
|
assertThat(thread.isDaemon(), is(true));
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
defaultScheduler.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testDefaultSchedulerShutdown() throws Exception {
|
||||||
|
ScheduledThreadPoolExecutor executor = mock(ScheduledThreadPoolExecutor.class);
|
||||||
|
DefaultScheduler defaultScheduler = new DefaultScheduler(executor);
|
||||||
|
defaultScheduler.shutdown();
|
||||||
|
verify(executor).shutdown();
|
||||||
|
verify(executor).awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||||
|
verify(executor).shutdownNow();
|
||||||
|
verifyNoMoreInteractions(executor);
|
||||||
|
|
||||||
|
when(executor.awaitTermination(1000, TimeUnit.MILLISECONDS)).thenReturn(true);
|
||||||
|
defaultScheduler.shutdown();
|
||||||
|
verify(executor, times(2)).shutdown();
|
||||||
|
verify(executor, times(2)).awaitTermination(1000, TimeUnit.MILLISECONDS);
|
||||||
|
verifyNoMoreInteractions(executor);
|
||||||
|
}
|
||||||
|
}
|
|
@ -46,7 +46,7 @@ response.
|
||||||
|
|
||||||
==== Script
|
==== Script
|
||||||
|
|
||||||
The `min` aggregation can also calculate the maximum of a script. The example
|
The `min` aggregation can also calculate the minimum of a script. The example
|
||||||
below computes the minimum price:
|
below computes the minimum price:
|
||||||
|
|
||||||
[source,js]
|
[source,js]
|
||||||
|
|
|
@ -84,6 +84,10 @@ PUT place_path_category
|
||||||
NOTE: Adding context mappings increases the index size for completion field. The completion index
|
NOTE: Adding context mappings increases the index size for completion field. The completion index
|
||||||
is entirely heap resident, you can monitor the completion field index size using <<indices-stats>>.
|
is entirely heap resident, you can monitor the completion field index size using <<indices-stats>>.
|
||||||
|
|
||||||
|
NOTE: deprecated[7.0.0, Indexing a suggestion without context on a context enabled completion field is deprecated
|
||||||
|
and will be removed in the next major release. If you want to index a suggestion that matches all contexts you should
|
||||||
|
add a special context for it.]
|
||||||
|
|
||||||
[[suggester-context-category]]
|
[[suggester-context-category]]
|
||||||
[float]
|
[float]
|
||||||
==== Category Context
|
==== Category Context
|
||||||
|
@ -156,9 +160,9 @@ POST place/_search?pretty
|
||||||
// CONSOLE
|
// CONSOLE
|
||||||
// TEST[continued]
|
// TEST[continued]
|
||||||
|
|
||||||
NOTE: When no categories are provided at query-time, all indexed documents are considered.
|
Note: deprecated[7.0.0, When no categories are provided at query-time, all indexed documents are considered.
|
||||||
Querying with no categories on a category enabled completion field should be avoided, as it
|
Querying with no categories on a category enabled completion field is deprecated and will be removed in the next major release
|
||||||
will degrade search performance.
|
as it degrades search performance considerably.]
|
||||||
|
|
||||||
Suggestions with certain categories can be boosted higher than others.
|
Suggestions with certain categories can be boosted higher than others.
|
||||||
The following filters suggestions by categories and additionally boosts
|
The following filters suggestions by categories and additionally boosts
|
||||||
|
|
|
@ -336,16 +336,80 @@ setup:
|
||||||
- length: { suggest.result.0.options: 1 }
|
- length: { suggest.result.0.options: 1 }
|
||||||
- match: { suggest.result.0.options.0.text: "foo" }
|
- match: { suggest.result.0.options.0.text: "foo" }
|
||||||
|
|
||||||
|
---
|
||||||
|
"Indexing and Querying without contexts is deprecated":
|
||||||
|
- skip:
|
||||||
|
version: " - 6.99.99"
|
||||||
|
reason: this feature was deprecated in 7.0
|
||||||
|
features: "warnings"
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
|
index:
|
||||||
|
index: test
|
||||||
|
type: test
|
||||||
|
id: 1
|
||||||
|
body:
|
||||||
|
suggest_context:
|
||||||
|
input: "foo"
|
||||||
|
contexts:
|
||||||
|
color: "red"
|
||||||
|
suggest_multi_contexts:
|
||||||
|
input: "bar"
|
||||||
|
contexts:
|
||||||
|
color: "blue"
|
||||||
|
|
||||||
|
- do:
|
||||||
|
warnings:
|
||||||
|
- "The ability to index a suggestion with no context on a context enabled completion field is deprecated and will be removed in the next major release."
|
||||||
|
index:
|
||||||
|
index: test
|
||||||
|
type: test
|
||||||
|
id: 2
|
||||||
|
body:
|
||||||
|
suggest_context:
|
||||||
|
input: "foo"
|
||||||
|
|
||||||
|
- do:
|
||||||
|
indices.refresh: {}
|
||||||
|
|
||||||
|
- do:
|
||||||
|
warnings:
|
||||||
|
- "The ability to query with no context on a context enabled completion field is deprecated and will be removed in the next major release."
|
||||||
search:
|
search:
|
||||||
body:
|
body:
|
||||||
suggest:
|
suggest:
|
||||||
result:
|
result:
|
||||||
text: "foo"
|
text: "foo"
|
||||||
completion:
|
completion:
|
||||||
skip_duplicates: true
|
|
||||||
field: suggest_context
|
field: suggest_context
|
||||||
|
|
||||||
- length: { suggest.result: 1 }
|
- length: { suggest.result: 1 }
|
||||||
- length: { suggest.result.0.options: 1 }
|
|
||||||
- match: { suggest.result.0.options.0.text: "foo" }
|
- do:
|
||||||
|
warnings:
|
||||||
|
- "The ability to query with no context on a context enabled completion field is deprecated and will be removed in the next major release."
|
||||||
|
search:
|
||||||
|
body:
|
||||||
|
suggest:
|
||||||
|
result:
|
||||||
|
text: "foo"
|
||||||
|
completion:
|
||||||
|
field: suggest_context
|
||||||
|
contexts: {}
|
||||||
|
|
||||||
|
- length: { suggest.result: 1 }
|
||||||
|
|
||||||
|
- do:
|
||||||
|
warnings:
|
||||||
|
- "The ability to query with no context on a context enabled completion field is deprecated and will be removed in the next major release."
|
||||||
|
search:
|
||||||
|
body:
|
||||||
|
suggest:
|
||||||
|
result:
|
||||||
|
text: "foo"
|
||||||
|
completion:
|
||||||
|
field: suggest_multi_contexts
|
||||||
|
contexts:
|
||||||
|
location: []
|
||||||
|
|
||||||
|
- length: { suggest.result: 1 }
|
||||||
|
|
|
@ -19,6 +19,8 @@ setup:
|
||||||
"type" : "category"
|
"type" : "category"
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
|
warnings:
|
||||||
|
- "The ability to index a suggestion with no context on a context enabled completion field is deprecated and will be removed in the next major release."
|
||||||
bulk:
|
bulk:
|
||||||
refresh: true
|
refresh: true
|
||||||
index: test
|
index: test
|
||||||
|
@ -31,8 +33,14 @@ setup:
|
||||||
|
|
||||||
---
|
---
|
||||||
"Test typed keys parameter for suggesters":
|
"Test typed keys parameter for suggesters":
|
||||||
|
- skip:
|
||||||
|
version: " - 6.99.99"
|
||||||
|
reason: queying a context suggester with no context was deprecated in 7.0
|
||||||
|
features: "warnings"
|
||||||
|
|
||||||
- do:
|
- do:
|
||||||
|
warnings:
|
||||||
|
- "The ability to query with no context on a context enabled completion field is deprecated and will be removed in the next major release."
|
||||||
search:
|
search:
|
||||||
typed_keys: true
|
typed_keys: true
|
||||||
body:
|
body:
|
||||||
|
|
|
@ -83,7 +83,6 @@ import static org.elasticsearch.index.mapper.TypeParsers.parseMultiField;
|
||||||
* for query-time filtering and boosting (see {@link ContextMappings}
|
* for query-time filtering and boosting (see {@link ContextMappings}
|
||||||
*/
|
*/
|
||||||
public class CompletionFieldMapper extends FieldMapper implements ArrayValueMapperParser {
|
public class CompletionFieldMapper extends FieldMapper implements ArrayValueMapperParser {
|
||||||
|
|
||||||
public static final String CONTENT_TYPE = "completion";
|
public static final String CONTENT_TYPE = "completion";
|
||||||
|
|
||||||
public static class Defaults {
|
public static class Defaults {
|
||||||
|
|
|
@ -57,6 +57,7 @@ import java.util.Objects;
|
||||||
* indexing.
|
* indexing.
|
||||||
*/
|
*/
|
||||||
public class CompletionSuggestionBuilder extends SuggestionBuilder<CompletionSuggestionBuilder> {
|
public class CompletionSuggestionBuilder extends SuggestionBuilder<CompletionSuggestionBuilder> {
|
||||||
|
|
||||||
private static final XContentType CONTEXT_BYTES_XCONTENT_TYPE = XContentType.JSON;
|
private static final XContentType CONTEXT_BYTES_XCONTENT_TYPE = XContentType.JSON;
|
||||||
static final String SUGGESTION_NAME = "completion";
|
static final String SUGGESTION_NAME = "completion";
|
||||||
static final ParseField CONTEXTS_FIELD = new ParseField("contexts", "context");
|
static final ParseField CONTEXTS_FIELD = new ParseField("contexts", "context");
|
||||||
|
|
|
@ -25,6 +25,8 @@ import org.apache.lucene.search.suggest.document.ContextSuggestField;
|
||||||
import org.apache.lucene.util.CharsRefBuilder;
|
import org.apache.lucene.util.CharsRefBuilder;
|
||||||
import org.elasticsearch.ElasticsearchParseException;
|
import org.elasticsearch.ElasticsearchParseException;
|
||||||
import org.elasticsearch.Version;
|
import org.elasticsearch.Version;
|
||||||
|
import org.elasticsearch.common.logging.DeprecationLogger;
|
||||||
|
import org.elasticsearch.common.logging.Loggers;
|
||||||
import org.elasticsearch.common.xcontent.ToXContent;
|
import org.elasticsearch.common.xcontent.ToXContent;
|
||||||
import org.elasticsearch.common.xcontent.XContentBuilder;
|
import org.elasticsearch.common.xcontent.XContentBuilder;
|
||||||
import org.elasticsearch.index.mapper.CompletionFieldMapper;
|
import org.elasticsearch.index.mapper.CompletionFieldMapper;
|
||||||
|
@ -51,6 +53,10 @@ import static org.elasticsearch.search.suggest.completion.context.ContextMapping
|
||||||
* for a {@link CompletionFieldMapper}
|
* for a {@link CompletionFieldMapper}
|
||||||
*/
|
*/
|
||||||
public class ContextMappings implements ToXContent {
|
public class ContextMappings implements ToXContent {
|
||||||
|
|
||||||
|
private static final DeprecationLogger DEPRECATION_LOGGER =
|
||||||
|
new DeprecationLogger(Loggers.getLogger(ContextMappings.class));
|
||||||
|
|
||||||
private final List<ContextMapping> contextMappings;
|
private final List<ContextMapping> contextMappings;
|
||||||
private final Map<String, ContextMapping> contextNameMap;
|
private final Map<String, ContextMapping> contextNameMap;
|
||||||
|
|
||||||
|
@ -143,6 +149,10 @@ public class ContextMappings implements ToXContent {
|
||||||
scratch.setLength(1);
|
scratch.setLength(1);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (typedContexts.isEmpty()) {
|
||||||
|
DEPRECATION_LOGGER.deprecated("The ability to index a suggestion with no context on a context enabled completion field" +
|
||||||
|
" is deprecated and will be removed in the next major release.");
|
||||||
|
}
|
||||||
return typedContexts;
|
return typedContexts;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -156,6 +166,7 @@ public class ContextMappings implements ToXContent {
|
||||||
*/
|
*/
|
||||||
public ContextQuery toContextQuery(CompletionQuery query, Map<String, List<ContextMapping.InternalQueryContext>> queryContexts) {
|
public ContextQuery toContextQuery(CompletionQuery query, Map<String, List<ContextMapping.InternalQueryContext>> queryContexts) {
|
||||||
ContextQuery typedContextQuery = new ContextQuery(query);
|
ContextQuery typedContextQuery = new ContextQuery(query);
|
||||||
|
boolean hasContext = false;
|
||||||
if (queryContexts.isEmpty() == false) {
|
if (queryContexts.isEmpty() == false) {
|
||||||
CharsRefBuilder scratch = new CharsRefBuilder();
|
CharsRefBuilder scratch = new CharsRefBuilder();
|
||||||
scratch.grow(1);
|
scratch.grow(1);
|
||||||
|
@ -169,10 +180,15 @@ public class ContextMappings implements ToXContent {
|
||||||
scratch.append(context.context);
|
scratch.append(context.context);
|
||||||
typedContextQuery.addContext(scratch.toCharsRef(), context.boost, !context.isPrefix);
|
typedContextQuery.addContext(scratch.toCharsRef(), context.boost, !context.isPrefix);
|
||||||
scratch.setLength(1);
|
scratch.setLength(1);
|
||||||
|
hasContext = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if (hasContext == false) {
|
||||||
|
DEPRECATION_LOGGER.deprecated("The ability to query with no context on a context enabled completion field is deprecated " +
|
||||||
|
"and will be removed in the next major release.");
|
||||||
|
}
|
||||||
return typedContextQuery;
|
return typedContextQuery;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -41,8 +41,6 @@ import org.elasticsearch.xpack.monitoring.exporter.Exporter;
|
||||||
import org.joda.time.format.DateTimeFormatter;
|
import org.joda.time.format.DateTimeFormatter;
|
||||||
|
|
||||||
import javax.net.ssl.SSLContext;
|
import javax.net.ssl.SSLContext;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
@ -658,12 +656,12 @@ public class HttpExporter extends Exporter {
|
||||||
if (sniffer != null) {
|
if (sniffer != null) {
|
||||||
sniffer.close();
|
sniffer.close();
|
||||||
}
|
}
|
||||||
} catch (IOException | RuntimeException e) {
|
} catch (Exception e) {
|
||||||
logger.error("an error occurred while closing the internal client sniffer", e);
|
logger.error("an error occurred while closing the internal client sniffer", e);
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
client.close();
|
client.close();
|
||||||
} catch (IOException | RuntimeException e) {
|
} catch (Exception e) {
|
||||||
logger.error("an error occurred while closing the internal client", e);
|
logger.error("an error occurred while closing the internal client", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -86,7 +86,7 @@ class NodeFailureListener extends RestClient.FailureListener {
|
||||||
resource.markDirty();
|
resource.markDirty();
|
||||||
}
|
}
|
||||||
if (sniffer != null) {
|
if (sniffer != null) {
|
||||||
sniffer.sniffOnFailure(host);
|
sniffer.sniffOnFailure();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -46,7 +46,7 @@ public class NodeFailureListenerTests extends ESTestCase {
|
||||||
|
|
||||||
listener.onFailure(host);
|
listener.onFailure(host);
|
||||||
|
|
||||||
verify(sniffer).sniffOnFailure(host);
|
verify(sniffer).sniffOnFailure();
|
||||||
}
|
}
|
||||||
|
|
||||||
public void testResourceNotifiedOnFailure() {
|
public void testResourceNotifiedOnFailure() {
|
||||||
|
@ -71,7 +71,7 @@ public class NodeFailureListenerTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (optionalSniffer != null) {
|
if (optionalSniffer != null) {
|
||||||
verify(sniffer).sniffOnFailure(host);
|
verify(sniffer).sniffOnFailure();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue