Decouple HostsSniffer from Sniffer

Sniffer now requires a HostsSniffer instance as a constructor argument, HostsSniffer has its won Builder helper. Also synchronized accesses to scheduledExecutorService in SnifferTask.
This commit is contained in:
javanna 2016-06-09 14:27:30 +02:00 committed by Luca Cavanna
parent 04d620da74
commit be5e2e145b
5 changed files with 206 additions and 104 deletions

View File

@ -36,6 +36,8 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Objects;
import java.util.concurrent.TimeUnit;
/** /**
* Class responsible for sniffing the http hosts from elasticsearch through the nodes info api and returning them back. * Class responsible for sniffing the http hosts from elasticsearch through the nodes info api and returning them back.
@ -47,14 +49,13 @@ public class HostsSniffer {
private final RestClient restClient; private final RestClient restClient;
private final Map<String, String> sniffRequestParams; private final Map<String, String> sniffRequestParams;
private final String scheme; private final Scheme scheme;
private final JsonFactory jsonFactory; private final JsonFactory jsonFactory = new JsonFactory();
public HostsSniffer(RestClient restClient, long sniffRequestTimeout, String scheme) { protected HostsSniffer(RestClient restClient, long sniffRequestTimeout, Scheme scheme) {
this.restClient = restClient; this.restClient = restClient;
this.sniffRequestParams = Collections.<String, String>singletonMap("timeout", sniffRequestTimeout + "ms"); this.sniffRequestParams = Collections.<String, String>singletonMap("timeout", sniffRequestTimeout + "ms");
this.scheme = scheme; this.scheme = scheme;
this.jsonFactory = new JsonFactory();
} }
/** /**
@ -95,7 +96,7 @@ public class HostsSniffer {
} }
} }
private static HttpHost readHost(String nodeId, JsonParser parser, String scheme) throws IOException { private static HttpHost readHost(String nodeId, JsonParser parser, Scheme scheme) throws IOException {
HttpHost httpHost = null; HttpHost httpHost = null;
String fieldName = null; String fieldName = null;
while (parser.nextToken() != JsonToken.END_OBJECT) { while (parser.nextToken() != JsonToken.END_OBJECT) {
@ -124,4 +125,71 @@ public class HostsSniffer {
} }
return httpHost; return httpHost;
} }
/**
* Returns a new {@link Builder} to help with {@link HostsSniffer} creation.
*/
public static Builder builder(RestClient restClient) {
return new Builder(restClient);
}
public enum Scheme {
HTTP("http"), HTTPS("https");
private final String name;
Scheme(String name) {
this.name = name;
}
@Override
public String toString() {
return name;
}
}
/**
* HostsSniffer builder. Helps creating a new {@link HostsSniffer}.
*/
public static class Builder {
public static final long DEFAULT_SNIFF_REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(1);
private final RestClient restClient;
private long sniffRequestTimeout = DEFAULT_SNIFF_REQUEST_TIMEOUT;
private Scheme scheme;
private Builder(RestClient restClient) {
Objects.requireNonNull(restClient, "restClient cannot be null");
this.restClient = restClient;
}
/**
* Sets the sniff request timeout to be passed in as a query string parameter to elasticsearch.
* Allows to halt the request without any failure, as only the nodes that have responded
* within this timeout will be returned.
*/
public Builder setSniffRequestTimeout(int sniffRequestTimeout) {
if (sniffRequestTimeout <= 0) {
throw new IllegalArgumentException("sniffRequestTimeout must be greater than 0");
}
this.sniffRequestTimeout = sniffRequestTimeout;
return this;
}
/**
* Sets the scheme to associate sniffed nodes with (as it is not returned by elasticsearch)
*/
public Builder setScheme(Scheme scheme) {
Objects.requireNonNull(scheme, "scheme cannot be null");
this.scheme = scheme;
return this;
}
/**
* Creates a new {@link HostsSniffer} instance given the provided configuration
*/
public HostsSniffer build() {
return new HostsSniffer(restClient, sniffRequestTimeout, scheme);
}
}
} }

View File

@ -48,9 +48,8 @@ public final class Sniffer extends RestClient.FailureListener implements Closeab
private final boolean sniffOnFailure; private final boolean sniffOnFailure;
private final Task task; private final Task task;
private Sniffer(RestClient restClient, long sniffRequestTimeout, String scheme, long sniffInterval, private Sniffer(RestClient restClient, HostsSniffer hostsSniffer, long sniffInterval,
boolean sniffOnFailure, long sniffAfterFailureDelay) { boolean sniffOnFailure, long sniffAfterFailureDelay) {
HostsSniffer hostsSniffer = new HostsSniffer(restClient, sniffRequestTimeout, scheme);
this.task = new Task(hostsSniffer, restClient, sniffInterval, sniffAfterFailureDelay); this.task = new Task(hostsSniffer, restClient, sniffInterval, sniffAfterFailureDelay);
this.sniffOnFailure = sniffOnFailure; this.sniffOnFailure = sniffOnFailure;
restClient.setFailureListener(this); restClient.setFailureListener(this);
@ -77,8 +76,7 @@ public final class Sniffer extends RestClient.FailureListener implements Closeab
private final long sniffAfterFailureDelay; private final long sniffAfterFailureDelay;
private final ScheduledExecutorService scheduledExecutorService; private final ScheduledExecutorService scheduledExecutorService;
private final AtomicBoolean running = new AtomicBoolean(false); private final AtomicBoolean running = new AtomicBoolean(false);
private volatile long nextSniffDelay; private ScheduledFuture<?> scheduledFuture;
private volatile ScheduledFuture<?> scheduledFuture;
private Task(HostsSniffer hostsSniffer, RestClient restClient, long sniffInterval, long sniffAfterFailureDelay) { private Task(HostsSniffer hostsSniffer, RestClient restClient, long sniffInterval, long sniffAfterFailureDelay) {
this.hostsSniffer = hostsSniffer; this.hostsSniffer = hostsSniffer;
@ -86,21 +84,34 @@ public final class Sniffer extends RestClient.FailureListener implements Closeab
this.sniffInterval = sniffInterval; this.sniffInterval = sniffInterval;
this.sniffAfterFailureDelay = sniffAfterFailureDelay; this.sniffAfterFailureDelay = sniffAfterFailureDelay;
this.scheduledExecutorService = Executors.newScheduledThreadPool(1); this.scheduledExecutorService = Executors.newScheduledThreadPool(1);
this.scheduledFuture = this.scheduledExecutorService.schedule(this, 0, TimeUnit.MILLISECONDS); scheduleNextRun(0);
this.nextSniffDelay = sniffInterval; }
synchronized void scheduleNextRun(long delayMillis) {
if (scheduledExecutorService.isShutdown() == false) {
try {
if (scheduledFuture != null) {
//regardless of when the next sniff is scheduled, cancel it and schedule a new one with updated delay
this.scheduledFuture.cancel(false);
}
logger.debug("scheduling next sniff in " + delayMillis + " ms");
this.scheduledFuture = this.scheduledExecutorService.schedule(this, delayMillis, TimeUnit.MILLISECONDS);
} catch(Throwable t) {
logger.error("error while scheduling next sniffer task", t);
}
}
} }
@Override @Override
public void run() { public void run() {
sniff(null); sniff(null, sniffInterval);
} }
void sniffOnFailure(HttpHost failedHost) { void sniffOnFailure(HttpHost failedHost) {
this.nextSniffDelay = sniffAfterFailureDelay; sniff(failedHost, sniffAfterFailureDelay);
sniff(failedHost);
} }
void sniff(HttpHost excludeHost) { void sniff(HttpHost excludeHost, long nextSniffDelayMillis) {
if (running.compareAndSet(false, true)) { if (running.compareAndSet(false, true)) {
try { try {
List<HttpHost> sniffedNodes = hostsSniffer.sniffHosts(); List<HttpHost> sniffedNodes = hostsSniffer.sniffHosts();
@ -112,22 +123,13 @@ public final class Sniffer extends RestClient.FailureListener implements Closeab
} catch (Throwable t) { } catch (Throwable t) {
logger.error("error while sniffing nodes", t); logger.error("error while sniffing nodes", t);
} finally { } finally {
try { scheduleNextRun(nextSniffDelayMillis);
//regardless of whether and when the next sniff is scheduled, cancel it and schedule a new one with updated delay
this.scheduledFuture.cancel(false);
logger.debug("scheduling next sniff in " + nextSniffDelay + " ms");
this.scheduledFuture = this.scheduledExecutorService.schedule(this, nextSniffDelay, TimeUnit.MILLISECONDS);
} catch (Throwable t) {
logger.error("error while scheduling next sniffer task", t);
} finally {
this.nextSniffDelay = sniffInterval;
running.set(false); running.set(false);
} }
} }
} }
}
void shutdown() { synchronized void shutdown() {
scheduledExecutorService.shutdown(); scheduledExecutorService.shutdown();
try { try {
if (scheduledExecutorService.awaitTermination(1000, TimeUnit.MILLISECONDS)) { if (scheduledExecutorService.awaitTermination(1000, TimeUnit.MILLISECONDS)) {
@ -143,8 +145,8 @@ public final class Sniffer extends RestClient.FailureListener implements Closeab
/** /**
* Returns a new {@link Builder} to help with {@link Sniffer} creation. * Returns a new {@link Builder} to help with {@link Sniffer} creation.
*/ */
public static Builder builder(RestClient restClient) { public static Builder builder(RestClient restClient, HostsSniffer hostsSniffer) {
return new Builder(restClient); return new Builder(restClient, hostsSniffer);
} }
/** /**
@ -153,21 +155,22 @@ public final class Sniffer extends RestClient.FailureListener implements Closeab
public static final class Builder { public static final class Builder {
public static final long DEFAULT_SNIFF_INTERVAL = TimeUnit.MINUTES.toMillis(5); public static final long DEFAULT_SNIFF_INTERVAL = TimeUnit.MINUTES.toMillis(5);
public static final long DEFAULT_SNIFF_AFTER_FAILURE_DELAY = TimeUnit.MINUTES.toMillis(1); public static final long DEFAULT_SNIFF_AFTER_FAILURE_DELAY = TimeUnit.MINUTES.toMillis(1);
public static final long DEFAULT_SNIFF_REQUEST_TIMEOUT = TimeUnit.SECONDS.toMillis(1);
private final RestClient restClient; private final RestClient restClient;
private long sniffRequestTimeout = DEFAULT_SNIFF_REQUEST_TIMEOUT; private final HostsSniffer hostsSniffer;
private long sniffInterval = DEFAULT_SNIFF_INTERVAL; private long sniffInterval = DEFAULT_SNIFF_INTERVAL;
private boolean sniffOnFailure = true; private boolean sniffOnFailure = true;
private long sniffAfterFailureDelay = DEFAULT_SNIFF_AFTER_FAILURE_DELAY; private long sniffAfterFailureDelay = DEFAULT_SNIFF_AFTER_FAILURE_DELAY;
private String scheme = "http";
/** /**
* Creates a new builder instance and sets the {@link RestClient} that will be used to communicate with elasticsearch. * Creates a new builder instance by providing the {@link RestClient} that will be used to communicate with elasticsearch,
* and the
*/ */
private Builder(RestClient restClient) { private Builder(RestClient restClient, HostsSniffer hostsSniffer) {
Objects.requireNonNull(restClient, "restClient cannot be null"); Objects.requireNonNull(restClient, "restClient cannot be null");
this.restClient = restClient; this.restClient = restClient;
Objects.requireNonNull(hostsSniffer, "hostsSniffer cannot be null");
this.hostsSniffer = hostsSniffer;
} }
/** /**
@ -203,37 +206,11 @@ public final class Sniffer extends RestClient.FailureListener implements Closeab
return this; return this;
} }
/**
* Sets the sniff request timeout to be passed in as a query string parameter to elasticsearch.
* Allows to halt the request without any failure, as only the nodes that have responded
* within this timeout will be returned.
*/
public Builder setSniffRequestTimeout(int sniffRequestTimeout) {
if (sniffRequestTimeout <= 0) {
throw new IllegalArgumentException("sniffRequestTimeout must be greater than 0");
}
this.sniffRequestTimeout = sniffRequestTimeout;
return this;
}
/**
* Sets the scheme to be used for sniffed nodes. This information is not returned by elasticsearch,
* default is http but should be customized if https is needed/enabled.
*/
public Builder setScheme(String scheme) {
Objects.requireNonNull(scheme, "scheme cannot be null");
if (scheme.equals("http") == false && scheme.equals("https") == false) {
throw new IllegalArgumentException("scheme must be either http or https");
}
this.scheme = scheme;
return this;
}
/** /**
* Creates the {@link Sniffer} based on the provided configuration. * Creates the {@link Sniffer} based on the provided configuration.
*/ */
public Sniffer build() { public Sniffer build() {
return new Sniffer(restClient, sniffRequestTimeout, scheme, sniffInterval, sniffOnFailure, sniffAfterFailureDelay); return new Sniffer(restClient, hostsSniffer, sniffInterval, sniffOnFailure, sniffAfterFailureDelay);
} }
} }
} }

View File

@ -0,0 +1,69 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client.sniff;
import com.carrotsearch.randomizedtesting.generators.RandomInts;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.http.HttpHost;
import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.RestClient;
public class HostsSnifferBuilderTests extends LuceneTestCase {
public void testBuild() throws Exception {
try {
HostsSniffer.builder(null);
fail("should have failed");
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "restClient cannot be null");
}
int numNodes = RandomInts.randomIntBetween(random(), 1, 5);
HttpHost[] hosts = new HttpHost[numNodes];
for (int i = 0; i < numNodes; i++) {
hosts[i] = new HttpHost("localhost", 9200 + i);
}
try (RestClient client = RestClient.builder(hosts).build()) {
try {
HostsSniffer.builder(client).setScheme(null);
fail("should have failed");
} catch(NullPointerException e) {
assertEquals(e.getMessage(), "scheme cannot be null");
}
try {
HostsSniffer.builder(client).setSniffRequestTimeout(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0));
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals(e.getMessage(), "sniffRequestTimeout must be greater than 0");
}
HostsSniffer.Builder builder = HostsSniffer.builder(client);
if (random().nextBoolean()) {
builder.setScheme(RandomPicks.randomFrom(random(), HostsSniffer.Scheme.values()));
}
if (random().nextBoolean()) {
builder.setSniffRequestTimeout(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
}
assertNotNull(builder.build());
}
}
}

View File

@ -43,7 +43,6 @@ import java.io.StringWriter;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
@ -58,14 +57,14 @@ import static org.hamcrest.CoreMatchers.equalTo;
public class HostsSnifferTests extends LuceneTestCase { public class HostsSnifferTests extends LuceneTestCase {
private int sniffRequestTimeout; private int sniffRequestTimeout;
private String scheme; private HostsSniffer.Scheme scheme;
private SniffResponse sniffResponse; private SniffResponse sniffResponse;
private HttpServer httpServer; private HttpServer httpServer;
@Before @Before
public void startHttpServer() throws IOException { public void startHttpServer() throws IOException {
this.sniffRequestTimeout = RandomInts.randomIntBetween(random(), 1000, 10000); this.sniffRequestTimeout = RandomInts.randomIntBetween(random(), 1000, 10000);
this.scheme = RandomPicks.randomFrom(random(), Arrays.asList("http", "https")); this.scheme = RandomPicks.randomFrom(random(), HostsSniffer.Scheme.values());
if (rarely()) { if (rarely()) {
this.sniffResponse = SniffResponse.buildFailure(); this.sniffResponse = SniffResponse.buildFailure();
} else { } else {
@ -132,7 +131,7 @@ public class HostsSnifferTests extends LuceneTestCase {
return httpServer; return httpServer;
} }
private static SniffResponse buildSniffResponse(String scheme) throws IOException { private static SniffResponse buildSniffResponse(HostsSniffer.Scheme scheme) throws IOException {
int numNodes = RandomInts.randomIntBetween(random(), 1, 5); int numNodes = RandomInts.randomIntBetween(random(), 1, 5);
List<HttpHost> hosts = new ArrayList<>(numNodes); List<HttpHost> hosts = new ArrayList<>(numNodes);
JsonFactory jsonFactory = new JsonFactory(); JsonFactory jsonFactory = new JsonFactory();
@ -164,7 +163,7 @@ public class HostsSnifferTests extends LuceneTestCase {
if (isHttpEnabled) { if (isHttpEnabled) {
String host = "host" + i; String host = "host" + i;
int port = RandomInts.randomIntBetween(random(), 9200, 9299); int port = RandomInts.randomIntBetween(random(), 9200, 9299);
HttpHost httpHost = new HttpHost(host, port, scheme); HttpHost httpHost = new HttpHost(host, port, scheme.toString());
hosts.add(httpHost); hosts.add(httpHost);
generator.writeObjectFieldStart("http"); generator.writeObjectFieldStart("http");
if (random().nextBoolean()) { if (random().nextBoolean()) {

View File

@ -20,87 +20,65 @@
package org.elasticsearch.client.sniff; package org.elasticsearch.client.sniff;
import com.carrotsearch.randomizedtesting.generators.RandomInts; import com.carrotsearch.randomizedtesting.generators.RandomInts;
import com.carrotsearch.randomizedtesting.generators.RandomPicks;
import org.apache.http.HttpHost; import org.apache.http.HttpHost;
import org.apache.lucene.util.LuceneTestCase; import org.apache.lucene.util.LuceneTestCase;
import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClient;
import java.util.Arrays; import java.io.IOException;
import java.util.logging.LogManager; import java.util.Collections;
import java.util.List;
public class SnifferBuilderTests extends LuceneTestCase { public class SnifferBuilderTests extends LuceneTestCase {
static {
LogManager.getLogManager().reset();
}
public void testBuild() throws Exception { public void testBuild() throws Exception {
int numNodes = RandomInts.randomIntBetween(random(), 1, 5); int numNodes = RandomInts.randomIntBetween(random(), 1, 5);
HttpHost[] hosts = new HttpHost[numNodes]; HttpHost[] hosts = new HttpHost[numNodes];
for (int i = 0; i < numNodes; i++) { for (int i = 0; i < numNodes; i++) {
hosts[i] = new HttpHost("localhost", 9200 + i); hosts[i] = new HttpHost("localhost", 9200 + i);
} }
try (RestClient client = RestClient.builder(hosts).build()) {
HostsSniffer hostsSniffer = new MockHostsSniffer();
try (RestClient client = RestClient.builder(hosts).build()) {
try { try {
Sniffer.builder(client).setScheme(null); Sniffer.builder(null, hostsSniffer).build();
fail("should have failed"); fail("should have failed");
} catch(NullPointerException e) { } catch(NullPointerException e) {
assertEquals("scheme cannot be null", e.getMessage()); assertEquals("restClient cannot be null", e.getMessage());
} }
try { try {
Sniffer.builder(client).setScheme("whatever"); Sniffer.builder(client, null).build();
fail("should have failed"); fail("should have failed");
} catch(IllegalArgumentException e) { } catch(NullPointerException e) {
assertEquals("scheme must be either http or https", e.getMessage()); assertEquals("hostsSniffer cannot be null", e.getMessage());
} }
try { try {
Sniffer.builder(client).setSniffInterval(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0)); Sniffer.builder(client, hostsSniffer).setSniffInterval(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0));
fail("should have failed"); fail("should have failed");
} catch(IllegalArgumentException e) { } catch(IllegalArgumentException e) {
assertEquals("sniffInterval must be greater than 0", e.getMessage()); assertEquals("sniffInterval must be greater than 0", e.getMessage());
} }
try { try {
Sniffer.builder(client).setSniffRequestTimeout(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0)); Sniffer.builder(client, hostsSniffer).setSniffAfterFailureDelay(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0));
fail("should have failed");
} catch(IllegalArgumentException e) {
assertEquals("sniffRequestTimeout must be greater than 0", e.getMessage());
}
try {
Sniffer.builder(client).setSniffAfterFailureDelay(RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0));
fail("should have failed"); fail("should have failed");
} catch(IllegalArgumentException e) { } catch(IllegalArgumentException e) {
assertEquals("sniffAfterFailureDelay must be greater than 0", e.getMessage()); assertEquals("sniffAfterFailureDelay must be greater than 0", e.getMessage());
} }
try { try (Sniffer sniffer = Sniffer.builder(client, hostsSniffer).build()) {
Sniffer.builder(null).build();
fail("should have failed");
} catch(NullPointerException e) {
assertEquals("restClient cannot be null", e.getMessage());
}
try (Sniffer sniffer = Sniffer.builder(client).build()) {
assertNotNull(sniffer); assertNotNull(sniffer);
} }
Sniffer.Builder builder = Sniffer.builder(client); Sniffer.Builder builder = Sniffer.builder(client, hostsSniffer);
if (random().nextBoolean()) {
builder.setScheme(RandomPicks.randomFrom(random(), Arrays.asList("http", "https")));
}
if (random().nextBoolean()) { if (random().nextBoolean()) {
builder.setSniffInterval(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); builder.setSniffInterval(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
} }
if (random().nextBoolean()) { if (random().nextBoolean()) {
builder.setSniffAfterFailureDelay(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE)); builder.setSniffAfterFailureDelay(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
} }
if (random().nextBoolean()) {
builder.setSniffRequestTimeout(RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE));
}
if (random().nextBoolean()) { if (random().nextBoolean()) {
builder.setSniffOnFailure(random().nextBoolean()); builder.setSniffOnFailure(random().nextBoolean());
} }
@ -109,4 +87,15 @@ public class SnifferBuilderTests extends LuceneTestCase {
} }
} }
} }
private static class MockHostsSniffer extends HostsSniffer {
MockHostsSniffer() {
super(null, -1, null);
}
@Override
public List<HttpHost> sniffHosts() throws IOException {
return Collections.singletonList(new HttpHost("localhost", 9200));
}
}
} }