get rid of connection selector predicate
This commit is contained in:
parent
f6a5a0a4ad
commit
ce663e9703
|
@ -47,23 +47,12 @@ public abstract class AbstractStaticConnectionPool implements ConnectionPool<Sta
|
||||||
|
|
||||||
private static final Log logger = LogFactory.getLog(AbstractStaticConnectionPool.class);
|
private static final Log logger = LogFactory.getLog(AbstractStaticConnectionPool.class);
|
||||||
|
|
||||||
private final Predicate<Connection> connectionSelector;
|
|
||||||
|
|
||||||
private final AtomicInteger lastConnectionIndex = new AtomicInteger(0);
|
private final AtomicInteger lastConnectionIndex = new AtomicInteger(0);
|
||||||
|
|
||||||
protected AbstractStaticConnectionPool(Predicate<Connection> connectionSelector) {
|
|
||||||
Objects.requireNonNull(connectionSelector, "connection selector predicate cannot be null");
|
|
||||||
this.connectionSelector = connectionSelector;
|
|
||||||
}
|
|
||||||
|
|
||||||
protected abstract List<StatefulConnection> getConnections();
|
protected abstract List<StatefulConnection> getConnections();
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public final Stream<StatefulConnection> nextConnection() {
|
public final Stream<StatefulConnection> nextConnection() {
|
||||||
return nextUnfilteredConnection().filter(connectionSelector);
|
|
||||||
}
|
|
||||||
|
|
||||||
protected final Stream<StatefulConnection> nextUnfilteredConnection() {
|
|
||||||
List<StatefulConnection> connections = getConnections();
|
List<StatefulConnection> connections = getConnections();
|
||||||
if (connections.isEmpty()) {
|
if (connections.isEmpty()) {
|
||||||
throw new IllegalStateException("no connections available in the connection pool");
|
throw new IllegalStateException("no connections available in the connection pool");
|
||||||
|
|
|
@ -49,8 +49,7 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool {
|
||||||
|
|
||||||
public SniffingConnectionPool(int sniffInterval, boolean sniffOnFailure, int sniffAfterFailureDelay,
|
public SniffingConnectionPool(int sniffInterval, boolean sniffOnFailure, int sniffAfterFailureDelay,
|
||||||
CloseableHttpClient client, RequestConfig sniffRequestConfig, int sniffRequestTimeout, Scheme scheme,
|
CloseableHttpClient client, RequestConfig sniffRequestConfig, int sniffRequestTimeout, Scheme scheme,
|
||||||
Predicate<Connection> connectionSelector, HttpHost... hosts) {
|
HttpHost... hosts) {
|
||||||
super(connectionSelector);
|
|
||||||
if (sniffInterval <= 0) {
|
if (sniffInterval <= 0) {
|
||||||
throw new IllegalArgumentException("sniffInterval must be greater than 0");
|
throw new IllegalArgumentException("sniffInterval must be greater than 0");
|
||||||
}
|
}
|
||||||
|
@ -129,7 +128,7 @@ public class SniffingConnectionPool extends AbstractStaticConnectionPool {
|
||||||
void sniff(Predicate<HttpHost> hostFilter) {
|
void sniff(Predicate<HttpHost> hostFilter) {
|
||||||
if (running.compareAndSet(false, true)) {
|
if (running.compareAndSet(false, true)) {
|
||||||
try {
|
try {
|
||||||
Iterator<StatefulConnection> connectionIterator = nextUnfilteredConnection().iterator();
|
Iterator<StatefulConnection> connectionIterator = nextConnection().iterator();
|
||||||
if (connectionIterator.hasNext()) {
|
if (connectionIterator.hasNext()) {
|
||||||
sniff(connectionIterator, hostFilter);
|
sniff(connectionIterator, hostFilter);
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -32,7 +32,6 @@ import org.apache.http.util.EntityUtils;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Objects;
|
import java.util.Objects;
|
||||||
import java.util.function.Predicate;
|
|
||||||
|
|
||||||
public class StaticConnectionPool extends AbstractStaticConnectionPool {
|
public class StaticConnectionPool extends AbstractStaticConnectionPool {
|
||||||
|
|
||||||
|
@ -43,9 +42,7 @@ public class StaticConnectionPool extends AbstractStaticConnectionPool {
|
||||||
private final RequestConfig pingRequestConfig;
|
private final RequestConfig pingRequestConfig;
|
||||||
private final List<StatefulConnection> connections;
|
private final List<StatefulConnection> connections;
|
||||||
|
|
||||||
public StaticConnectionPool(CloseableHttpClient client, boolean pingEnabled, RequestConfig pingRequestConfig,
|
public StaticConnectionPool(CloseableHttpClient client, boolean pingEnabled, RequestConfig pingRequestConfig, HttpHost... hosts) {
|
||||||
Predicate<Connection> connectionSelector, HttpHost... hosts) {
|
|
||||||
super(connectionSelector);
|
|
||||||
Objects.requireNonNull(client, "client cannot be null");
|
Objects.requireNonNull(client, "client cannot be null");
|
||||||
Objects.requireNonNull(pingRequestConfig, "pingRequestConfig cannot be null");
|
Objects.requireNonNull(pingRequestConfig, "pingRequestConfig cannot be null");
|
||||||
if (hosts == null || hosts.length == 0) {
|
if (hosts == null || hosts.length == 0) {
|
||||||
|
|
|
@ -47,7 +47,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
|
||||||
RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0), random().nextBoolean(),
|
RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0), random().nextBoolean(),
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
||||||
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) {
|
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), hosts)) {
|
||||||
|
|
||||||
fail("pool creation should have failed " + connectionPool);
|
fail("pool creation should have failed " + connectionPool);
|
||||||
} catch(IllegalArgumentException e) {
|
} catch(IllegalArgumentException e) {
|
||||||
|
@ -58,7 +58,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
||||||
RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0), httpClient, RequestConfig.DEFAULT,
|
RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0), httpClient, RequestConfig.DEFAULT,
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
||||||
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) {
|
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), hosts)) {
|
||||||
fail("pool creation should have failed " + connectionPool);
|
fail("pool creation should have failed " + connectionPool);
|
||||||
} catch(IllegalArgumentException e) {
|
} catch(IllegalArgumentException e) {
|
||||||
assertEquals(e.getMessage(), "sniffAfterFailureDelay must be greater than 0");
|
assertEquals(e.getMessage(), "sniffAfterFailureDelay must be greater than 0");
|
||||||
|
@ -68,7 +68,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), null, RequestConfig.DEFAULT,
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), null, RequestConfig.DEFAULT,
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
||||||
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) {
|
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), hosts)) {
|
||||||
fail("pool creation should have failed " + connectionPool);
|
fail("pool creation should have failed " + connectionPool);
|
||||||
} catch(NullPointerException e) {
|
} catch(NullPointerException e) {
|
||||||
assertEquals(e.getMessage(), "client cannot be null");
|
assertEquals(e.getMessage(), "client cannot be null");
|
||||||
|
@ -78,7 +78,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, null,
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, null,
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
||||||
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) {
|
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), hosts)) {
|
||||||
fail("pool creation should have failed " + connectionPool);
|
fail("pool creation should have failed " + connectionPool);
|
||||||
} catch(NullPointerException e) {
|
} catch(NullPointerException e) {
|
||||||
assertEquals(e.getMessage(), "sniffRequestConfig cannot be null");
|
assertEquals(e.getMessage(), "sniffRequestConfig cannot be null");
|
||||||
|
@ -88,7 +88,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
|
||||||
RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0),
|
RandomInts.randomIntBetween(random(), Integer.MIN_VALUE, 0),
|
||||||
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) {
|
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), hosts)) {
|
||||||
fail("pool creation should have failed " + connectionPool);
|
fail("pool creation should have failed " + connectionPool);
|
||||||
} catch(IllegalArgumentException e) {
|
} catch(IllegalArgumentException e) {
|
||||||
assertEquals(e.getMessage(), "sniffRequestTimeout must be greater than 0");
|
assertEquals(e.getMessage(), "sniffRequestTimeout must be greater than 0");
|
||||||
|
@ -98,18 +98,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
||||||
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), null, hosts)) {
|
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), (HttpHost[])null)) {
|
||||||
fail("pool creation should have failed " + connectionPool);
|
|
||||||
} catch(NullPointerException e) {
|
|
||||||
assertEquals(e.getMessage(), "connection selector predicate cannot be null");
|
|
||||||
}
|
|
||||||
|
|
||||||
try (SniffingConnectionPool connectionPool = new SniffingConnectionPool(
|
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
|
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
|
||||||
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()),
|
|
||||||
connection -> random().nextBoolean(), (HttpHost[])null)) {
|
|
||||||
fail("pool creation should have failed " + connectionPool);
|
fail("pool creation should have failed " + connectionPool);
|
||||||
} catch(IllegalArgumentException e) {
|
} catch(IllegalArgumentException e) {
|
||||||
assertEquals(e.getMessage(), "no hosts provided");
|
assertEquals(e.getMessage(), "no hosts provided");
|
||||||
|
@ -119,8 +108,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
||||||
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(),
|
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), (HttpHost) null)) {
|
||||||
(HttpHost) null)) {
|
|
||||||
fail("pool creation should have failed " + connectionPool);
|
fail("pool creation should have failed " + connectionPool);
|
||||||
} catch(NullPointerException e) {
|
} catch(NullPointerException e) {
|
||||||
assertEquals(e.getMessage(), "host cannot be null");
|
assertEquals(e.getMessage(), "host cannot be null");
|
||||||
|
@ -130,7 +118,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
||||||
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean())) {
|
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()))) {
|
||||||
fail("pool creation should have failed " + connectionPool);
|
fail("pool creation should have failed " + connectionPool);
|
||||||
} catch(IllegalArgumentException e) {
|
} catch(IllegalArgumentException e) {
|
||||||
assertEquals(e.getMessage(), "no hosts provided");
|
assertEquals(e.getMessage(), "no hosts provided");
|
||||||
|
@ -140,7 +128,7 @@ public class SniffingConnectionPoolTests extends LuceneTestCase {
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), random().nextBoolean(),
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE), httpClient, RequestConfig.DEFAULT,
|
||||||
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
RandomInts.randomIntBetween(random(), 1, Integer.MAX_VALUE),
|
||||||
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), connection -> random().nextBoolean(), hosts)) {
|
RandomPicks.randomFrom(random(), SniffingConnectionPool.Scheme.values()), hosts)) {
|
||||||
assertNotNull(sniffingConnectionPool);
|
assertNotNull(sniffingConnectionPool);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -43,45 +43,37 @@ public class StaticConnectionPoolTests extends LuceneTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
new StaticConnectionPool(null, random().nextBoolean(), RequestConfig.DEFAULT, connection -> random().nextBoolean(), hosts);
|
new StaticConnectionPool(null, random().nextBoolean(), RequestConfig.DEFAULT, hosts);
|
||||||
} catch(NullPointerException e) {
|
} catch(NullPointerException e) {
|
||||||
assertEquals(e.getMessage(), "client cannot be null");
|
assertEquals(e.getMessage(), "client cannot be null");
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
new StaticConnectionPool(httpClient, random().nextBoolean(), null, connection -> random().nextBoolean(), hosts);
|
new StaticConnectionPool(httpClient, random().nextBoolean(), null, hosts);
|
||||||
} catch(NullPointerException e) {
|
} catch(NullPointerException e) {
|
||||||
assertEquals(e.getMessage(), "pingRequestConfig cannot be null");
|
assertEquals(e.getMessage(), "pingRequestConfig cannot be null");
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, null, hosts);
|
new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, (HttpHost) null);
|
||||||
} catch(NullPointerException e) {
|
|
||||||
assertEquals(e.getMessage(), "connection selector predicate cannot be null");
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT,
|
|
||||||
connection -> random().nextBoolean(), (HttpHost) null);
|
|
||||||
} catch(NullPointerException e) {
|
} catch(NullPointerException e) {
|
||||||
assertEquals(e.getMessage(), "host cannot be null");
|
assertEquals(e.getMessage(), "host cannot be null");
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT,
|
new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, (HttpHost[])null);
|
||||||
connection -> random().nextBoolean(), (HttpHost[])null);
|
|
||||||
} catch(IllegalArgumentException e) {
|
} catch(IllegalArgumentException e) {
|
||||||
assertEquals(e.getMessage(), "no hosts provided");
|
assertEquals(e.getMessage(), "no hosts provided");
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT, connection -> random().nextBoolean());
|
new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT);
|
||||||
} catch(IllegalArgumentException e) {
|
} catch(IllegalArgumentException e) {
|
||||||
assertEquals(e.getMessage(), "no hosts provided");
|
assertEquals(e.getMessage(), "no hosts provided");
|
||||||
}
|
}
|
||||||
|
|
||||||
StaticConnectionPool staticConnectionPool = new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT,
|
StaticConnectionPool staticConnectionPool = new StaticConnectionPool(httpClient, random().nextBoolean(), RequestConfig.DEFAULT,
|
||||||
connection -> random().nextBoolean(), hosts);
|
hosts);
|
||||||
assertNotNull(staticConnectionPool);
|
assertNotNull(staticConnectionPool);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue