diff --git a/client-sniffer/src/main/java/org/elasticsearch/client/sniff/SniffOnFailureListener.java b/client-sniffer/src/main/java/org/elasticsearch/client/sniff/SniffOnFailureListener.java new file mode 100644 index 00000000000..76350057141 --- /dev/null +++ b/client-sniffer/src/main/java/org/elasticsearch/client/sniff/SniffOnFailureListener.java @@ -0,0 +1,65 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.sniff; + +import org.apache.http.HttpHost; +import org.elasticsearch.client.RestClient; + +import java.io.IOException; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * {@link org.elasticsearch.client.RestClient.FailureListener} implementation that allows to perform + * sniffing on failure. Gets notified whenever a failure happens and uses a {@link Sniffer} instance + * to manually reload hosts and sets them back to the {@link RestClient}. The {@link Sniffer} instance + * needs to be lazily set through {@link #setSniffer(Sniffer)}. + */ +public class SniffOnFailureListener extends RestClient.FailureListener { + + private volatile Sniffer sniffer; + private final AtomicBoolean set; + + public SniffOnFailureListener() { + this.set = new AtomicBoolean(false); + } + + /** + * Sets the {@link Sniffer} instance used to perform sniffing + * @throws IllegalStateException if the sniffer was already set, as it can only be set once + */ + public void setSniffer(Sniffer sniffer) { + Objects.requireNonNull(sniffer, "sniffer must not be null"); + if (set.compareAndSet(false, true)) { + this.sniffer = sniffer; + } else { + throw new IllegalStateException("sniffer can only be set once"); + } + } + + @Override + public void onFailure(HttpHost host) throws IOException { + if (sniffer == null) { + throw new IllegalStateException("sniffer was not set, unable to sniff on failure"); + } + //re-sniff immediately but take out the node that failed + sniffer.sniffOnFailure(host); + } +} diff --git a/client-sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java b/client-sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java index c1aeb3efe36..74a28cdd222 100644 --- a/client-sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java +++ b/client-sniffer/src/main/java/org/elasticsearch/client/sniff/Sniffer.java @@ -37,30 +37,26 @@ import java.util.concurrent.atomic.AtomicBoolean; /** * Class responsible for sniffing nodes from an elasticsearch cluster and setting them to a provided instance of {@link RestClient}. * Must be created via {@link Builder}, which allows to set all of the different options or rely on defaults. - * A background task fetches the nodes from elasticsearch and updates them periodically. - * Supports sniffing on failure, meaning that the client will notify the sniffer at each host failure, so that nodes can be updated - * straightaway. + * A background task fetches the nodes through the {@link HostsSniffer} and sets them to the {@link RestClient} instance. + * It is possible to perform sniffing on failure by creating a {@link SniffOnFailureListener} and providing it as an argument to + * {@link org.elasticsearch.client.RestClient.Builder#setFailureListener(RestClient.FailureListener)}. The Sniffer implementation + * needs to be lazily set to the previously created SniffOnFailureListener through {@link SniffOnFailureListener#setSniffer(Sniffer)}. */ -public final class Sniffer extends RestClient.FailureListener implements Closeable { +public final class Sniffer implements Closeable { private static final Log logger = LogFactory.getLog(Sniffer.class); - private final boolean sniffOnFailure; private final Task task; - private Sniffer(RestClient restClient, HostsSniffer hostsSniffer, long sniffInterval, - boolean sniffOnFailure, long sniffAfterFailureDelay) { + private Sniffer(RestClient restClient, HostsSniffer hostsSniffer, long sniffInterval, long sniffAfterFailureDelay) { this.task = new Task(hostsSniffer, restClient, sniffInterval, sniffAfterFailureDelay); - this.sniffOnFailure = sniffOnFailure; - restClient.setFailureListener(this); } - @Override - public void onFailure(HttpHost host) throws IOException { - if (sniffOnFailure) { - //re-sniff immediately but take out the node that failed - task.sniffOnFailure(host); - } + /** + * Triggers a new sniffing round and explicitly takes out the failed host provided as argument + */ + public void sniffOnFailure(HttpHost failedHost) { + this.task.sniffOnFailure(failedHost); } @Override @@ -114,12 +110,16 @@ public final class Sniffer extends RestClient.FailureListener implements Closeab void sniff(HttpHost excludeHost, long nextSniffDelayMillis) { if (running.compareAndSet(false, true)) { try { - List sniffedNodes = hostsSniffer.sniffHosts(); + List sniffedHosts = hostsSniffer.sniffHosts(); + logger.debug("sniffed hosts: " + sniffedHosts); if (excludeHost != null) { - sniffedNodes.remove(excludeHost); + sniffedHosts.remove(excludeHost); + } + if (sniffedHosts.isEmpty()) { + logger.warn("no hosts to set, hosts will be updated at the next sniffing round"); + } else { + this.restClient.setHosts(sniffedHosts.toArray(new HttpHost[sniffedHosts.size()])); } - logger.debug("sniffed nodes: " + sniffedNodes); - this.restClient.setHosts(sniffedNodes.toArray(new HttpHost[sniffedNodes.size()])); } catch (Exception e) { logger.error("error while sniffing nodes", e); } finally { @@ -159,7 +159,6 @@ public final class Sniffer extends RestClient.FailureListener implements Closeab private final RestClient restClient; private final HostsSniffer hostsSniffer; private long sniffIntervalMillis = DEFAULT_SNIFF_INTERVAL; - private boolean sniffOnFailure = true; private long sniffAfterFailureDelayMillis = DEFAULT_SNIFF_AFTER_FAILURE_DELAY; /** @@ -186,15 +185,6 @@ public final class Sniffer extends RestClient.FailureListener implements Closeab return this; } - /** - * Enables/disables sniffing on failure. If enabled, at each failure nodes will be reloaded, and a new sniff execution will - * be scheduled after a shorter time than usual (sniffAfterFailureDelayMillis). - */ - public Builder setSniffOnFailure(boolean sniffOnFailure) { - this.sniffOnFailure = sniffOnFailure; - return this; - } - /** * Sets the delay of a sniff execution scheduled after a failure (in milliseconds) */ @@ -210,7 +200,7 @@ public final class Sniffer extends RestClient.FailureListener implements Closeab * Creates the {@link Sniffer} based on the provided configuration. */ public Sniffer build() { - return new Sniffer(restClient, hostsSniffer, sniffIntervalMillis, sniffOnFailure, sniffAfterFailureDelayMillis); + return new Sniffer(restClient, hostsSniffer, sniffIntervalMillis, sniffAfterFailureDelayMillis); } } } diff --git a/client-sniffer/src/test/java/org/elasticsearch/client/sniff/MockHostsSniffer.java b/client-sniffer/src/test/java/org/elasticsearch/client/sniff/MockHostsSniffer.java new file mode 100644 index 00000000000..bdc052d07c8 --- /dev/null +++ b/client-sniffer/src/test/java/org/elasticsearch/client/sniff/MockHostsSniffer.java @@ -0,0 +1,39 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.sniff; + +import org.apache.http.HttpHost; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +class MockHostsSniffer extends HostsSniffer { + MockHostsSniffer() { + super(null, -1, null); + } + + @Override + public List sniffHosts() throws IOException { + List hosts = new ArrayList<>(); + hosts.add(new HttpHost("localhost", 9200)); + return hosts; + } +} diff --git a/client-sniffer/src/test/java/org/elasticsearch/client/sniff/SniffOnFailureListenerTests.java b/client-sniffer/src/test/java/org/elasticsearch/client/sniff/SniffOnFailureListenerTests.java new file mode 100644 index 00000000000..fe2555763ac --- /dev/null +++ b/client-sniffer/src/test/java/org/elasticsearch/client/sniff/SniffOnFailureListenerTests.java @@ -0,0 +1,57 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.client.sniff; + +import org.apache.http.HttpHost; +import org.apache.lucene.util.LuceneTestCase; +import org.elasticsearch.client.RestClient; + +public class SniffOnFailureListenerTests extends LuceneTestCase { + + public void testSetSniffer() throws Exception { + SniffOnFailureListener listener = new SniffOnFailureListener(); + + try { + listener.onFailure(null); + fail("should have failed"); + } catch(IllegalStateException e) { + assertEquals("sniffer was not set, unable to sniff on failure", e.getMessage()); + } + + try { + listener.setSniffer(null); + fail("should have failed"); + } catch(NullPointerException e) { + assertEquals("sniffer must not be null", e.getMessage()); + } + + RestClient restClient = RestClient.builder(new HttpHost("localhost", 9200)).build(); + try (Sniffer sniffer = Sniffer.builder(restClient, new MockHostsSniffer()).build()) { + listener.setSniffer(sniffer); + try { + listener.setSniffer(sniffer); + fail("should have failed"); + } catch(IllegalStateException e) { + assertEquals("sniffer can only be set once", e.getMessage()); + } + listener.onFailure(new HttpHost("localhost", 9200)); + } + } +} diff --git a/client-sniffer/src/test/java/org/elasticsearch/client/sniff/SnifferBuilderTests.java b/client-sniffer/src/test/java/org/elasticsearch/client/sniff/SnifferBuilderTests.java index f09ef77f5cf..a3e8d8e80e7 100644 --- a/client-sniffer/src/test/java/org/elasticsearch/client/sniff/SnifferBuilderTests.java +++ b/client-sniffer/src/test/java/org/elasticsearch/client/sniff/SnifferBuilderTests.java @@ -24,10 +24,6 @@ import org.apache.http.HttpHost; import org.apache.lucene.util.LuceneTestCase; import org.elasticsearch.client.RestClient; -import java.io.IOException; -import java.util.Collections; -import java.util.List; - public class SnifferBuilderTests extends LuceneTestCase { public void testBuild() throws Exception { @@ -80,23 +76,9 @@ public class SnifferBuilderTests extends LuceneTestCase { if (random().nextBoolean()) { builder.setSniffAfterFailureDelayMillis(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); } - if (random().nextBoolean()) { - builder.setSniffOnFailure(random().nextBoolean()); - } try (Sniffer sniffer = builder.build()) { assertNotNull(sniffer); } } } - - private static class MockHostsSniffer extends HostsSniffer { - MockHostsSniffer() { - super(null, -1, null); - } - - @Override - public List sniffHosts() throws IOException { - return Collections.singletonList(new HttpHost("localhost", 9200)); - } - } } diff --git a/client/src/main/java/org/elasticsearch/client/RestClient.java b/client/src/main/java/org/elasticsearch/client/RestClient.java index 2d1c50af3cb..9baafb2b5b7 100644 --- a/client/src/main/java/org/elasticsearch/client/RestClient.java +++ b/client/src/main/java/org/elasticsearch/client/RestClient.java @@ -89,12 +89,14 @@ public final class RestClient implements Closeable { private final AtomicInteger lastHostIndex = new AtomicInteger(0); private volatile Set hosts; private final ConcurrentMap blacklist = new ConcurrentHashMap<>(); - private volatile FailureListener failureListener = new FailureListener(); + private final FailureListener failureListener; - private RestClient(CloseableHttpClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders, HttpHost[] hosts) { + private RestClient(CloseableHttpClient client, long maxRetryTimeoutMillis, Header[] defaultHeaders, + HttpHost[] hosts, FailureListener failureListener) { this.client = client; this.maxRetryTimeoutMillis = maxRetryTimeoutMillis; this.defaultHeaders = defaultHeaders; + this.failureListener = failureListener; setHosts(hosts); } @@ -278,13 +280,6 @@ public final class RestClient implements Closeable { failureListener.onFailure(host); } - /** - * Sets a {@link FailureListener} to be notified each and every time a host fails - */ - public synchronized void setFailureListener(FailureListener failureListener) { - this.failureListener = failureListener; - } - @Override public void close() throws IOException { client.close(); @@ -368,6 +363,7 @@ public final class RestClient implements Closeable { private CloseableHttpClient httpClient; private int maxRetryTimeout = DEFAULT_MAX_RETRY_TIMEOUT_MILLIS; private Header[] defaultHeaders = EMPTY_HEADERS; + private FailureListener failureListener; /** * Creates a new builder instance and sets the hosts that the client will send requests to. @@ -418,6 +414,15 @@ public final class RestClient implements Closeable { return this; } + /** + * Sets the {@link FailureListener} to be notified for each request failure + */ + public Builder setFailureListener(FailureListener failureListener) { + Objects.requireNonNull(failureListener, "failure listener must not be null"); + this.failureListener = failureListener; + return this; + } + /** * Creates a new {@link RestClient} based on the provided configuration. */ @@ -425,7 +430,10 @@ public final class RestClient implements Closeable { if (httpClient == null) { httpClient = createDefaultHttpClient(null); } - return new RestClient(httpClient, maxRetryTimeout, defaultHeaders, hosts); + if (failureListener == null) { + failureListener = new FailureListener(); + } + return new RestClient(httpClient, maxRetryTimeout, defaultHeaders, hosts, failureListener); } /** diff --git a/client/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java b/client/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java index 88b1406d925..cb7de44cd5e 100644 --- a/client/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java +++ b/client/src/test/java/org/elasticsearch/client/RestClientBuilderTests.java @@ -74,6 +74,13 @@ public class RestClientBuilderTests extends LuceneTestCase { assertEquals("default header must not be null", e.getMessage()); } + try { + RestClient.builder(new HttpHost("localhost", 9200)).setFailureListener(null); + fail("should have failed"); + } catch(NullPointerException e) { + assertEquals("failure listener must not be null", e.getMessage()); + } + int numNodes = RandomInts.randomIntBetween(random(), 1, 5); HttpHost[] hosts = new HttpHost[numNodes]; for (int i = 0; i < numNodes; i++) { diff --git a/client/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java b/client/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java index 64792c2bb1b..beae3cfc9f1 100644 --- a/client/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java +++ b/client/src/test/java/org/elasticsearch/client/RestClientMultipleHostsTests.java @@ -87,14 +87,10 @@ public class RestClientMultipleHostsTests extends LuceneTestCase { for (int i = 0; i < numHosts; i++) { httpHosts[i] = new HttpHost("localhost", 9200 + i); } - restClient = RestClient.builder(httpHosts).setHttpClient(httpClient).build(); failureListener = new TrackingFailureListener(); - restClient.setFailureListener(failureListener); + restClient = RestClient.builder(httpHosts).setHttpClient(httpClient).setFailureListener(failureListener).build(); } - /** - * Test that - */ public void testRoundRobinOkStatusCodes() throws Exception { int numIters = RandomInts.randomIntBetween(random(), 1, 5); for (int i = 0; i < numIters; i++) { diff --git a/client/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java b/client/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java index e2657722701..ef2a09fb570 100644 --- a/client/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java +++ b/client/src/test/java/org/elasticsearch/client/RestClientSingleHostTests.java @@ -122,9 +122,9 @@ public class RestClientSingleHostTests extends LuceneTestCase { defaultHeaders[i] = new BasicHeader(headerName, headerValue); } httpHost = new HttpHost("localhost", 9200); - restClient = RestClient.builder(httpHost).setHttpClient(httpClient).setDefaultHeaders(defaultHeaders).build(); failureListener = new TrackingFailureListener(); - restClient.setFailureListener(failureListener); + restClient = RestClient.builder(httpHost).setHttpClient(httpClient).setDefaultHeaders(defaultHeaders) + .setFailureListener(failureListener).build(); } /**