diff --git a/client/src/main/java/org/elasticsearch/client/AbstractStaticConnectionPool.java b/client/src/main/java/org/elasticsearch/client/AbstractStaticConnectionPool.java index c043dd17059..50923fa5406 100644 --- a/client/src/main/java/org/elasticsearch/client/AbstractStaticConnectionPool.java +++ b/client/src/main/java/org/elasticsearch/client/AbstractStaticConnectionPool.java @@ -47,23 +47,12 @@ public abstract class AbstractStaticConnectionPool implements ConnectionPool connectionSelector; - private final AtomicInteger lastConnectionIndex = new AtomicInteger(0); - protected AbstractStaticConnectionPool(Predicate connectionSelector) { - Objects.requireNonNull(connectionSelector, "connection selector predicate cannot be null"); - this.connectionSelector = connectionSelector; - } - protected abstract List getConnections(); @Override public final Stream nextConnection() { - return nextUnfilteredConnection().filter(connectionSelector); - } - - protected final Stream nextUnfilteredConnection() { List connections = getConnections(); if (connections.isEmpty()) { throw new IllegalStateException("no connections available in the connection pool"); diff --git a/client/src/main/java/org/elasticsearch/client/SniffingConnectionPool.java b/client/src/main/java/org/elasticsearch/client/SniffingConnectionPool.java index 25d17b45ee5..eaaf98d0506 100644 --- a/client/src/main/java/org/elasticsearch/client/SniffingConnectionPool.java +++ b/client/src/main/java/org/elasticsearch/client/SniffingConnectionPool.java @@ -49,8 +49,7 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool { public SniffingConnectionPool(int sniffInterval, boolean sniffOnFailure, int sniffAfterFailureDelay, CloseableHttpClient client, RequestConfig sniffRequestConfig, int sniffRequestTimeout, Scheme scheme, - Predicate connectionSelector, HttpHost... hosts) { - super(connectionSelector); + HttpHost... hosts) { if (sniffInterval <= 0) { throw new IllegalArgumentException("sniffInterval must be greater than 0"); } @@ -129,7 +128,7 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool { void sniff(Predicate hostFilter) { if (running.compareAndSet(false, true)) { try { - Iterator connectionIterator = nextUnfilteredConnection().iterator(); + Iterator connectionIterator = nextConnection().iterator(); if (connectionIterator.hasNext()) { sniff(connectionIterator, hostFilter); } else { diff --git a/client/src/main/java/org/elasticsearch/client/StaticConnectionPool.java b/client/src/main/java/org/elasticsearch/client/StaticConnectionPool.java index 539b7ad0f1f..602298b1b3b 100644 --- a/client/src/main/java/org/elasticsearch/client/StaticConnectionPool.java +++ b/client/src/main/java/org/elasticsearch/client/StaticConnectionPool.java @@ -32,7 +32,6 @@ import org.apache.http.util.EntityUtils; import java.io.IOException; import java.util.List; import java.util.Objects; -import java.util.function.Predicate; public class StaticConnectionPool extends AbstractStaticConnectionPool { @@ -43,9 +42,7 @@ public class StaticConnectionPool extends AbstractStaticConnectionPool { private final RequestConfig pingRequestConfig; private final List connections; - public StaticConnectionPool(CloseableHttpClient client, boolean pingEnabled, RequestConfig pingRequestConfig, - Predicate connectionSelector, HttpHost... hosts) { - super(connectionSelector); + public StaticConnectionPool(CloseableHttpClient client, boolean pingEnabled, RequestConfig pingRequestConfig, HttpHost... hosts) { Objects.requireNonNull(client, "client cannot be null"); Objects.requireNonNull(pingRequestConfig, "pingRequestConfig cannot be null"); if (hosts == null || hosts.length == 0) { diff --git a/client/src/test/java/org/elasticsearch/client/SniffingConnectionPoolTests.java b/client/src/test/java/org/elasticsearch/client/SniffingConnectionPoolTests.java index 97b7d0867c6..1ae9279f78e 100644 --- a/client/src/test/java/org/elasticsearch/client/SniffingConnectionPoolTests.java +++ b/client/src/test/java/org/elasticsearch/client/SniffingConnectionPoolTests.java @@ -47,7 +47,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0), random().nextBoolean(), RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), hosts)) { fail("pool creation should have failed " + connectionPool); } catch(IllegalArgumentException e) { @@ -58,7 +58,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(), RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0), httpClient, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), hosts)) { fail("pool creation should have failed " + connectionPool); } catch(IllegalArgumentException e) { assertEquals(e.getMessage(), "sniffAfterFailureDelay must be greater than 0"); @@ -68,7 +68,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(), RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), null, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), hosts)) { fail("pool creation should have failed " + connectionPool); } catch(NullPointerException e) { assertEquals(e.getMessage(), "client cannot be null"); @@ -78,7 +78,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(), RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, null, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), hosts)) { fail("pool creation should have failed " + connectionPool); } catch(NullPointerException e) { assertEquals(e.getMessage(), "sniffRequestConfig cannot be null"); @@ -88,7 +88,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(), RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), hosts)) { fail("pool creation should have failed " + connectionPool); } catch(IllegalArgumentException e) { assertEquals(e.getMessage(), "sniffRequestTimeout must be greater than 0"); @@ -98,18 +98,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(), RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), null, hosts)) { - fail("pool creation should have failed " + connectionPool); - } catch(NullPointerException e) { - assertEquals(e.getMessage(), "connection selector predicate cannot be null"); - } - - try (SniffingConnectionPool connectionPool = new SniffingConnectionPool( - RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(), - RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT, - RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), - connection -> random().nextBoolean(), (HttpHost[])null)) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), (HttpHost[])null)) { fail("pool creation should have failed " + connectionPool); } catch(IllegalArgumentException e) { assertEquals(e.getMessage(), "no hosts provided"); @@ -119,8 +108,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(), RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), - (HttpHost) null)) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), (HttpHost) null)) { fail("pool creation should have failed " + connectionPool); } catch(NullPointerException e) { assertEquals(e.getMessage(), "host cannot be null"); @@ -130,7 +118,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(), RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean())) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()))) { fail("pool creation should have failed " + connectionPool); } catch(IllegalArgumentException e) { assertEquals(e.getMessage(), "no hosts provided"); @@ -140,7 +128,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase { RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(), RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT, RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), - RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) { + RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), hosts)) { assertNotNull(sniffingConnectionPool); } } diff --git a/client/src/test/java/org/elasticsearch/client/StaticConnectionPoolTests.java b/client/src/test/java/org/elasticsearch/client/StaticConnectionPoolTests.java index 82e8c60b02f..60398b71c0d 100644 --- a/client/src/test/java/org/elasticsearch/client/StaticConnectionPoolTests.java +++ b/client/src/test/java/org/elasticsearch/client/StaticConnectionPoolTests.java @@ -43,45 +43,37 @@ public class StaticConnectionPoolTests extends LuceneTestCase { } try { - new StaticConnectionPool(null, random().nextBoolean(), RequestConfig.DEFAULT, connection -> random().nextBoolean(), hosts); + new StaticConnectionPool(null, random().nextBoolean(), RequestConfig.DEFAULT, hosts); } catch(NullPointerException e) { assertEquals(e.getMessage(), "client cannot be null"); } try { - new StaticConnectionPool(httpClient, random().nextBoolean(), null, connection -> random().nextBoolean(), hosts); + new StaticConnectionPool(httpClient, random().nextBoolean(), null, hosts); } catch(NullPointerException e) { assertEquals(e.getMessage(), "pingRequestConfig cannot be null"); } try { - new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, null, hosts); - } catch(NullPointerException e) { - assertEquals(e.getMessage(), "connection selector predicate cannot be null"); - } - - try { - new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, - connection -> random().nextBoolean(), (HttpHost) null); + new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, (HttpHost) null); } catch(NullPointerException e) { assertEquals(e.getMessage(), "host cannot be null"); } try { - new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, - connection -> random().nextBoolean(), (HttpHost[])null); + new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, (HttpHost[])null); } catch(IllegalArgumentException e) { assertEquals(e.getMessage(), "no hosts provided"); } try { - new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, connection -> random().nextBoolean()); + new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT); } catch(IllegalArgumentException e) { assertEquals(e.getMessage(), "no hosts provided"); } StaticConnectionPool staticConnectionPool = new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, - connection -> random().nextBoolean(), hosts); + hosts); assertNotNull(staticConnectionPool); } }