remove streams and java 8 only api, build with source and target 1.7

This commit is contained in:
javanna 2016-05-09 19:45:14 +02:00 committed by Luca Cavanna
parent 9569ebc262
commit 599dad560c
5 changed files with 48 additions and 27 deletions

View File

@ -17,11 +17,15 @@
* under the License. * under the License.
*/ */
import org.elasticsearch.gradle.precommit.PrecommitTasks; import org.elasticsearch.gradle.precommit.PrecommitTasks
import org.gradle.api.JavaVersion
group = 'org.elasticsearch.client' group = 'org.elasticsearch.client'
apply plugin: 'elasticsearch.build' apply plugin: 'elasticsearch.build'
targetCompatibility = JavaVersion.VERSION_1_7
sourceCompatibility = JavaVersion.VERSION_1_7
dependencies { dependencies {
// TODO once we got rid of the client in the test framework we should use a version variable here // TODO once we got rid of the client in the test framework we should use a version variable here
compile "org.apache.httpcomponents:httpclient:4.5.2" compile "org.apache.httpcomponents:httpclient:4.5.2"
@ -47,8 +51,9 @@ dependencies {
testCompile "org.bouncycastle:bcprov-jdk15on:1.54" testCompile "org.bouncycastle:bcprov-jdk15on:1.54"
} }
compileJava.options.compilerArgs << '-Xlint:-cast,-rawtypes,-try,-unchecked' //TODO compiling from 1.8 with target 1.7 and source 1.7 is best effort, not enough to ensure we are java 7 compatible
compileTestJava.options.compilerArgs << '-Xlint:-rawtypes' compileJava.options.compilerArgs << '-target' << '1.7' << '-source' << '1.7' << '-Xlint:all,-path,-serial,-options'
compileTestJava.options.compilerArgs << '-target' << '1.7' << '-source' << '1.7'
forbiddenApisMain { forbiddenApisMain {
//client does not depend on core, so only jdk signatures should be checked //client does not depend on core, so only jdk signatures should be checked

View File

@ -27,10 +27,11 @@ import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
/** /**
* Pool of connections to the different hosts that belong to an elasticsearch cluster. * Pool of connections to the different hosts that belong to an elasticsearch cluster.
@ -66,7 +67,7 @@ public abstract class ConnectionPool implements Closeable {
* It may happen that the stream is empty, in which case it means that there aren't healthy connections to use. * It may happen that the stream is empty, in which case it means that there aren't healthy connections to use.
* Then {@link #lastResortConnection()} should be called to retrieve a non healthy connection and try it. * Then {@link #lastResortConnection()} should be called to retrieve a non healthy connection and try it.
*/ */
public final Stream<Connection> nextConnection() { public final Iterator<Connection> nextConnection() {
List<Connection> connections = getConnections(); List<Connection> connections = getConnections();
if (connections.isEmpty()) { if (connections.isEmpty()) {
throw new IllegalStateException("no connections available in the connection pool"); throw new IllegalStateException("no connections available in the connection pool");
@ -75,7 +76,14 @@ public abstract class ConnectionPool implements Closeable {
List<Connection> sortedConnections = new ArrayList<>(connections); List<Connection> sortedConnections = new ArrayList<>(connections);
//TODO is it possible to make this O(1)? (rotate is O(n)) //TODO is it possible to make this O(1)? (rotate is O(n))
Collections.rotate(sortedConnections, sortedConnections.size() - lastConnectionIndex.getAndIncrement()); Collections.rotate(sortedConnections, sortedConnections.size() - lastConnectionIndex.getAndIncrement());
return sortedConnections.stream().filter(connection -> connection.isAlive() || connection.shouldBeRetried()); Iterator<Connection> connectionIterator = sortedConnections.iterator();
while (connectionIterator.hasNext()) {
Connection connection = connectionIterator.next();
if (connection.isAlive() == false && connection.shouldBeRetried() == false) {
connectionIterator.remove();
}
}
return connectionIterator;
} }
/** /**
@ -96,10 +104,18 @@ public abstract class ConnectionPool implements Closeable {
* only in case {@link #nextConnection()} returns an empty stream. * only in case {@link #nextConnection()} returns an empty stream.
*/ */
public final Connection lastResortConnection() { public final Connection lastResortConnection() {
Connection Connection = getConnections().stream() List<Connection> connections = getConnections();
.sorted((o1, o2) -> Long.compare(o1.getDeadUntil(), o2.getDeadUntil())).findFirst().get(); if (connections.isEmpty()) {
Connection.markResurrected(); throw new IllegalStateException("no connections available in the connection pool");
return Connection; }
List<Connection> sortedConnections = new ArrayList<>(connections);
Collections.sort(sortedConnections, new Comparator<Connection>() {
@Override
public int compare(Connection o1, Connection o2) {
return Long.compare(o1.getDeadUntil(), o2.getDeadUntil());
}
});
return sortedConnections.get(0);
} }
/** /**

View File

@ -39,11 +39,11 @@ import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.net.URISyntaxException; import java.net.URISyntaxException;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.Locale; import java.util.Locale;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
public final class RestClient implements Closeable { public final class RestClient implements Closeable {
@ -63,11 +63,11 @@ public final class RestClient implements Closeable {
throws IOException { throws IOException {
URI uri = buildUri(endpoint, params); URI uri = buildUri(endpoint, params);
HttpRequestBase request = createHttpRequest(method, uri, entity); HttpRequestBase request = createHttpRequest(method, uri, entity);
Iterator<Connection> connectionIterator = connectionPool.nextConnection().iterator(); Iterator<Connection> connectionIterator = connectionPool.nextConnection();
if (connectionIterator.hasNext() == false) { if (connectionIterator.hasNext() == false) {
Connection connection = connectionPool.lastResortConnection(); Connection connection = connectionPool.lastResortConnection();
logger.info("no healthy nodes available, trying " + connection.getHost()); logger.info("no healthy nodes available, trying " + connection.getHost());
return performRequest(request, Stream.of(connection).iterator()); return performRequest(request, Collections.singleton(connection).iterator());
} }
return performRequest(request, connectionIterator); return performRequest(request, connectionIterator);
} }

View File

@ -29,6 +29,7 @@ import org.elasticsearch.client.ConnectionPool;
import org.elasticsearch.client.RestClient; import org.elasticsearch.client.RestClient;
import java.io.IOException; import java.io.IOException;
import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Objects; import java.util.Objects;
@ -37,8 +38,6 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Predicate;
import java.util.stream.Stream;
/** /**
* Connection pool implementation that sniffs nodes from elasticsearch at regular intervals. * Connection pool implementation that sniffs nodes from elasticsearch at regular intervals.
@ -97,25 +96,25 @@ public class SniffingConnectionPool extends ConnectionPool {
@Override @Override
public void run() { public void run() {
sniff(node -> true); sniff(null);
} }
void sniffOnFailure(HttpHost failedHost) { void sniffOnFailure(HttpHost failedHost) {
//sync sniff straightaway on failure //sync sniff straightaway on failure
failure = true; failure = true;
sniff(host -> host.equals(failedHost) == false); sniff(failedHost);
} }
void sniff(Predicate<HttpHost> hostFilter) { void sniff(HttpHost excludeHost) {
if (running.compareAndSet(false, true)) { if (running.compareAndSet(false, true)) {
try { try {
Iterator<Connection> connectionIterator = nextConnection().iterator(); Iterator<Connection> connectionIterator = nextConnection();
if (connectionIterator.hasNext()) { if (connectionIterator.hasNext()) {
sniff(connectionIterator, hostFilter); sniff(connectionIterator, excludeHost);
} else { } else {
Connection connection = lastResortConnection(); Connection connection = lastResortConnection();
logger.info("no healthy nodes available, trying " + connection.getHost()); logger.info("no healthy nodes available, trying " + connection.getHost());
sniff(Stream.of(connection).iterator(), hostFilter); sniff(Collections.singleton(connection).iterator(), excludeHost);
} }
} catch (Throwable t) { } catch (Throwable t) {
logger.error("error while sniffing nodes", t); logger.error("error while sniffing nodes", t);
@ -139,15 +138,16 @@ public class SniffingConnectionPool extends ConnectionPool {
} }
} }
void sniff(Iterator<Connection> connectionIterator, Predicate<HttpHost> hostFilter) throws IOException { void sniff(Iterator<Connection> connectionIterator, HttpHost excludeHost) throws IOException {
IOException lastSeenException = null; IOException lastSeenException = null;
while (connectionIterator.hasNext()) { while (connectionIterator.hasNext()) {
Connection connection = connectionIterator.next(); Connection connection = connectionIterator.next();
try { try {
List<HttpHost> sniffedNodes = sniffer.sniffNodes(connection.getHost()); List<HttpHost> sniffedNodes = sniffer.sniffNodes(connection.getHost());
HttpHost[] filteredNodes = sniffedNodes.stream().filter(hostFilter).toArray(HttpHost[]::new); if (excludeHost != null) {
logger.debug("adding " + filteredNodes.length + " nodes out of " + sniffedNodes.size() + " sniffed nodes"); sniffedNodes.remove(excludeHost);
connections = createConnections(filteredNodes); }
connections = createConnections(sniffedNodes.toArray(new HttpHost[sniffedNodes.size()]));
onSuccess(connection); onSuccess(connection);
return; return;
} catch (IOException e) { } catch (IOException e) {

View File

@ -115,7 +115,7 @@ public class SnifferTests extends LuceneTestCase {
} }
} }
private static MockWebServer buildMockWebServer(SniffResponse sniffResponse, int sniffTimeout) throws UnsupportedEncodingException { private static MockWebServer buildMockWebServer(final SniffResponse sniffResponse, final int sniffTimeout) throws UnsupportedEncodingException {
MockWebServer server = new MockWebServer(); MockWebServer server = new MockWebServer();
final Dispatcher dispatcher = new Dispatcher() { final Dispatcher dispatcher = new Dispatcher() {
@Override @Override
@ -244,7 +244,7 @@ public class SnifferTests extends LuceneTestCase {
} }
static SniffResponse buildFailure() { static SniffResponse buildFailure() {
return new SniffResponse("", Collections.emptyList(), true); return new SniffResponse("", Collections.<HttpHost>emptyList(), true);
} }
static SniffResponse buildResponse(String nodesInfoBody, List<HttpHost> hosts) { static SniffResponse buildResponse(String nodesInfoBody, List<HttpHost> hosts) {