From 3efbe95ca4f53cc9fb018d5b65a1b69c77d09616 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 22 Feb 2016 11:37:57 -0800 Subject: [PATCH] RestClient prototype --- client/build.gradle | 82 +++++++ .../org/elasticsearch/client/IndexClient.java | 65 ++++++ .../org/elasticsearch/client/RestClient.java | 217 ++++++++++++++++++ .../elasticsearch/client/RestClientTests.java | 152 ++++++++++++ settings.gradle | 1 + 5 files changed, 517 insertions(+) create mode 100644 client/build.gradle create mode 100644 client/src/main/java/org/elasticsearch/client/IndexClient.java create mode 100644 client/src/main/java/org/elasticsearch/client/RestClient.java create mode 100644 client/src/test/java/org/elasticsearch/client/RestClientTests.java diff --git a/client/build.gradle b/client/build.gradle new file mode 100644 index 00000000000..1ae32bf6e98 --- /dev/null +++ b/client/build.gradle @@ -0,0 +1,82 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.elasticsearch.gradle.precommit.PrecommitTasks; + +group = 'org.elasticsearch.client' +apply plugin: 'elasticsearch.build' + +dependencies { + // we use the lucene test-framework here but we are not pulling in ES core or the test framework + testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}" + testCompile "junit:junit:${versions.junit}" + testCompile 'org.hamcrest:hamcrest-all:1.3' + testCompile "org.apache.lucene:lucene-test-framework:${versions.lucene}" + testCompile "org.apache.lucene:lucene-core:${versions.lucene}" + testCompile "org.apache.lucene:lucene-codecs:${versions.lucene}" + + + // TODO once we got rid of the client in the test framework we should use a version variable here + // we use httpclient here since the JDK support has several issue + // - httpclient supports basic and digest auth and other schemes + // - URLConnection has issues with SSL and not all system patches are available + // - URLConnection can't stream data but httpclient can + // - URLConnection doesn't expose responsecodes unless it's a 200 + // - httpclient supports pipelining which we might wanna expose down the road? + compile "org.apache.httpcomponents:httpclient:4.5.1" + compile "org.apache.httpcomponents:httpcore:4.4.4" + compile "org.apache.httpcomponents:httpcore-nio:4.4.4" // currently unused + compile "commons-logging:commons-logging:1.2" + compile 'org.apache.httpcomponents:httpasyncclient:4.1.1' // currently unused +} + + +compileJava.options.compilerArgs << '-Xlint:-cast,-rawtypes,-try,-unchecked' +compileTestJava.options.compilerArgs << '-Xlint:-rawtypes' + +// the main files are actually test files, so use the appopriate forbidden api sigs +forbiddenApisMain { + bundledSignatures = ['jdk-unsafe', 'jdk-deprecated'] + signaturesURLs = [PrecommitTasks.getResource('/forbidden/all-signatures.txt'), + PrecommitTasks.getResource('/forbidden/test-signatures.txt')] +} + +forbiddenApisTest.enabled=false +forbiddenApisMain.enabled=false + +// dependency license are currently checked in distribution +dependencyLicenses.enabled = false +jarHell.enabled = false +thirdPartyAudit.enabled = false +thirdPartyAudit.excludes = [ + // classes are missing + 'javax.servlet.ServletContextEvent', + 'javax.servlet.ServletContextListener', + 'org.apache.avalon.framework.logger.Logger', + 'org.apache.log.Hierarchy', + 'org.apache.log.Logger', + // we intentionally exclude the ant tasks because people were depending on them from their tests!!!!!!! + 'org.apache.tools.ant.BuildException', + 'org.apache.tools.ant.DirectoryScanner', + 'org.apache.tools.ant.Task', + 'org.apache.tools.ant.types.FileSet', + 'org.easymock.EasyMock', + 'org.easymock.IArgumentMatcher', + 'org.jmock.core.Constraint', +] diff --git a/client/src/main/java/org/elasticsearch/client/IndexClient.java b/client/src/main/java/org/elasticsearch/client/IndexClient.java new file mode 100644 index 00000000000..ce3905b2a12 --- /dev/null +++ b/client/src/main/java/org/elasticsearch/client/IndexClient.java @@ -0,0 +1,65 @@ +package org.elasticsearch.client; + +import java.io.IOException; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; + +/** + * Created by simon on 2/16/16. + */ +public class IndexClient { + + private final RestClient client; + + public IndexClient(RestClient client) { + this.client = client; + } + + public void delete(String index, String type, String id) throws IOException { + delete(index, type, id, null); + } + public void delete(String index, String type, String id, DeleteOptions params) throws IOException { + Objects.requireNonNull(index, "index must not be null"); + Objects.requireNonNull(type, "type must not be null"); + Objects.requireNonNull(id, "id must not be null"); + String deleteEndpoint = String.format("/%s/%s/%s", index, type, id); + client.httpDelete(deleteEndpoint, params == null ? Collections.emptyMap() : params.options); + } + + public class DeleteOptions { + private final Map options = new HashMap<>(); + /** Specific write consistency setting for the operation one of "one", "quorum", "all"*/ + public void consistency(String consistency) { + options.put("consistency", consistency); + }; + /** ID of parent document */ + public void parent(String parent){ + options.put("parent", parent); + }; + /** Refresh the index after performing the operation */ + public void refresh(Boolean refresh) { + options.put("refresh", refresh); + }; + /** Specific routing value */ + public void routing(String routing) { + options.put("routing", routing); + }; + /** Explicit version number for concurrency control */ + public void version(Number version) { + options.put("version", version); + }; + /** Specific version type one of "internal", "external", "external_gte", "force" */ + public void versionType(String versionType) { + options.put("version_type", versionType); + }; + /** Explicit operation timeout */ + public void timeout(String timeout) { + options.put("timeout", timeout); + }; + } + + + +} diff --git a/client/src/main/java/org/elasticsearch/client/RestClient.java b/client/src/main/java/org/elasticsearch/client/RestClient.java new file mode 100644 index 00000000000..8c2c9f0e8ee --- /dev/null +++ b/client/src/main/java/org/elasticsearch/client/RestClient.java @@ -0,0 +1,217 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client; + +import org.apache.http.HttpEntity; +import org.apache.http.HttpHost; +import org.apache.http.HttpResponse; +import org.apache.http.NameValuePair; +import org.apache.http.StatusLine; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.HttpDelete; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPost; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.client.methods.HttpUriRequest; +import org.apache.http.client.utils.URLEncodedUtils; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.message.BasicNameValuePair; + +import java.io.BufferedReader; +import java.io.Closeable; +import java.io.IOException; +import java.io.InputStreamReader; +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.stream.Collectors; + +/** + */ +public class RestClient implements Closeable{ + + private final CloseableHttpClient client; + private volatile Set hosts; + private final String scheme; + private final Set blackList = new CopyOnWriteArraySet<>(); + + public RestClient(HttpHost... hosts) { + this("http", HttpClientBuilder.create().setDefaultRequestConfig(RequestConfig.custom().setConnectTimeout(100).build()).build(), hosts); + } + + public RestClient(String scheme, CloseableHttpClient client, HttpHost[] hosts) { + if (hosts.length == 0) { + throw new IllegalArgumentException("hosts must note be empty"); + } + this.scheme = scheme; + this.client = client; + this.hosts = new HashSet<>(Arrays.asList(hosts)); + } + + + public HttpResponse httpGet(String endpoint, Map params) throws IOException { + return httpGet(getHostIterator(true), endpoint, params); + } + + HttpResponse httpGet(Iterable hosts, String endpoint, Map params) throws IOException { + HttpUriRequest request = new HttpGet(buildUri(endpoint, pairs(params))); + return execute(request, hosts); + } + + HttpResponse httpDelete(String endpoint, Map params) throws IOException { + HttpUriRequest request = new HttpDelete(buildUri(endpoint, pairs(params))); + return execute(request, getHostIterator(true)); + } + + HttpResponse httpPut(String endpoint, HttpEntity body, Map params) throws IOException { + HttpPut request = new HttpPut(buildUri(endpoint, pairs(params))); + request.setEntity(body); + return execute(request, getHostIterator(true)); + } + + HttpResponse httpPost(String endpoint, HttpEntity body, Map params) throws IOException { + HttpPost request = new HttpPost(buildUri(endpoint, pairs(params))); + request.setEntity(body); + return execute(request, getHostIterator(true)); + } + + private List pairs(Map options) { + return options.entrySet().stream().map(e -> new BasicNameValuePair(e.getKey(), e.getValue().toString())) + .collect(Collectors.toList()); + } + + public HttpResponse execute(HttpUriRequest request, Iterable retryHosts) throws IOException { + IOException exception = null; + for (HttpHost singleHost : retryHosts) { + try { + return client.execute(singleHost, request); + } catch (IOException ex) { + if (this.hosts.contains(singleHost)) { + blackList.add(singleHost); + } + if (exception != null) { + exception.addSuppressed(ex); + } else { + exception = ex; + } + } + } + throw exception; + } + + public URI buildUri(String path, List query) { + try { + return new URI(null, null, null, -1, path, URLEncodedUtils.format(query, StandardCharsets.UTF_8), null); + } catch (URISyntaxException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + + public Set fetchNodes(HttpHost host, boolean useClientNodes, boolean local, boolean checkAvailable) throws IOException { + HttpResponse httpResponse = httpGet(Collections.singleton(host), "/_cat/nodes", Collections.singletonMap("h", "http,role")); + StatusLine statusLine = httpResponse.getStatusLine(); + if (statusLine.getStatusCode() != 200) { + throw new RuntimeException("failed to fetch nodes: " + statusLine.getReasonPhrase()); + } + HttpEntity entity = httpResponse.getEntity(); + Set hosts = new HashSet<>(); + try (BufferedReader content = new BufferedReader(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8))) { + String line; + while((line = content.readLine()) != null) { + final String[] split = line.split("\\s+"); + assert split.length == 2; + String boundAddress = split[0]; + String role = split[1]; + if ("-".equals(split[0].trim()) == false) { + if ("d".equals(role.trim()) == false && useClientNodes == false) { + continue; + } + URI boundAddressAsURI = URI.create("http://" + boundAddress); + HttpHost newHost = new HttpHost(boundAddressAsURI.getHost(), boundAddressAsURI.getPort(), scheme); + if (checkAvailable == false || isAvailable(newHost)) { + hosts.add(newHost); + } + } + } + } + return hosts; + } + + public String getClusterName(HttpHost host) throws IOException { + HttpResponse httpResponse = httpGet(Collections.singleton(host), "/_cat/health", Collections.singletonMap("h", "cluster")); + StatusLine statusLine = httpResponse.getStatusLine(); + if (statusLine.getStatusCode() != 200) { + throw new RuntimeException("failed to fetch nodes: " + statusLine.getReasonPhrase()); + } + HttpEntity entity = httpResponse.getEntity(); + try (BufferedReader content = new BufferedReader(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8))) { + String clusterName = content.readLine().trim(); + if (clusterName.length() == 0) { + throw new IllegalStateException("clustername must not be empty"); + } + return clusterName; + } + } + + public boolean isAvailable(HttpHost host) { + try { + HttpResponse httpResponse = httpGet(Collections.singleton(host), "/", Collections.emptyMap()); + StatusLine statusLine = httpResponse.getStatusLine(); + return statusLine.getStatusCode() == 200; + } catch (IOException ex) { + return false; + } + } + + public synchronized void setNodes(Set hosts) { + this.hosts = Collections.unmodifiableSet(new HashSet<>(hosts)); + blackList.retainAll(hosts); + } + + public Set getHosts() { + return hosts; + } + + protected Iterable getHostIterator(boolean clearBlacklist) { + if (hosts.size() == blackList.size() && clearBlacklist) { + blackList.clear(); // lets try again + } + return () -> hosts.stream().filter((h) -> blackList.contains(h) == false).iterator(); + } + + int getNumHosts() { + return hosts.size(); + } + + int getNumBlacklistedHosts() { + return blackList.size(); + } + @Override + public void close() throws IOException { + client.close(); + } +} diff --git a/client/src/test/java/org/elasticsearch/client/RestClientTests.java b/client/src/test/java/org/elasticsearch/client/RestClientTests.java new file mode 100644 index 00000000000..366ae692339 --- /dev/null +++ b/client/src/test/java/org/elasticsearch/client/RestClientTests.java @@ -0,0 +1,152 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.elasticsearch.client; +import com.sun.net.httpserver.HttpExchange; +import com.sun.net.httpserver.HttpServer; +import org.apache.http.HttpHost; +import org.apache.http.HttpResponse; +import org.apache.lucene.util.LuceneTestCase; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.net.URISyntaxException; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.ExecutionException; +public class RestClientTests extends LuceneTestCase { + //TODO this should be refactored into a base test!! + HttpServer server; + protected String clusterName = "elasticsearch"; + protected List additionalNodes = Collections.emptyList(); + + + public void setUp() throws Exception { + super.setUp(); + server = HttpServer.create(new InetSocketAddress(0), 0); + server.setExecutor(null); // creates a default executor + server.start(); + server.createContext("/", (t) -> { + handle("/", t); + }); + server.createContext("/_cat/nodes", (t) -> { + handle("/_cat/nodes", t); + }); + server.createContext("/_cat/health", (t) -> { + handle("/_cat/health", t); + }); + } + + protected void handle(String path, HttpExchange t) throws IOException { + final String response; + switch (path) { + case "/": + response = "{}"; + break; + case "/_cat/nodes": + StringBuilder builder = new StringBuilder( "127.0.0.1:" + server.getAddress().getPort() + " " + "d\n"); + for (String host : additionalNodes) { + builder.append(host).append("\n"); + } + response = builder.toString(); + break; + case "/_cat/health": + response = clusterName; + break; + default: + throw new IllegalArgumentException("no such handler " + path); + } + t.sendResponseHeaders(200, response.length()); + OutputStream os = t.getResponseBody(); + os.write(response.getBytes()); + os.close(); + } + + public void tearDown() throws Exception { + super.tearDown(); + server.stop(0); + } + + public void testGetClustername() throws IOException { + HttpHost httpHost = new HttpHost("127.0.0.1", server.getAddress().getPort(), "http"); + try(RestClient client = new RestClient(httpHost)) { + assertEquals(clusterName, client.getClusterName(httpHost)); + } + } + + public void testFetchNodes() throws IOException { + additionalNodes = Arrays.asList("127.0.0.2:9200 c", "127.0.0.3:9200 d"); + HttpHost httpHost = new HttpHost("127.0.0.1", server.getAddress().getPort(), "http"); + try(RestClient client = new RestClient(httpHost)) { + assertEquals(3, client.fetchNodes(httpHost, true, true, false).size()); + assertTrue(client.fetchNodes(httpHost, true, true, false).toString(), client.fetchNodes(httpHost, true, true, false).contains(new HttpHost("127.0.0.2", 9200, "http"))); + assertTrue(client.fetchNodes(httpHost, true, true, false).contains(new HttpHost("127.0.0.3", 9200, "http"))); + assertTrue(client.fetchNodes(httpHost, true, true, false).contains(httpHost)); + assertEquals(1, client.fetchNodes(httpHost, true, true, true).size()); + } + } + + public void testSimpleRetry() throws IOException{ + additionalNodes = Arrays.asList("127.0.0.2:9200 c", "127.0.0.3:9200 d"); + HttpHost httpHost = new HttpHost("127.0.0.1", server.getAddress().getPort(), "http"); + try(RestClient client = new RestClient(httpHost)) { + client.setNodes(client.fetchNodes(httpHost, true, true, false)); + HttpResponse httpResponse = client.httpGet("/_cat/health", Collections.emptyMap()); + assertEquals(httpResponse.getStatusLine().getStatusCode(), 200); + server.stop(0); + try { + client.httpGet("/_cat/health", Collections.emptyMap()); + fail(); + } catch (IOException ex) { + assertTrue(ex.getMessage(), ex.getMessage().endsWith("failed: connect timed out") || ex.getMessage().endsWith("failed: Connection refused")); + } + } + } + + public void testBlacklist() throws IOException{ + additionalNodes = Arrays.asList("127.0.0.2:9200 c", "127.0.0.3:9200 d"); + HttpHost httpHost = new HttpHost("127.0.0.1", server.getAddress().getPort(), "http"); + try(RestClient client = new RestClient(httpHost)) { + client.setNodes(client.fetchNodes(httpHost, true, true, false)); + assertEquals(3, client.getNumHosts()); + assertEquals(0, client.getNumBlacklistedHosts()); + server.stop(0); + try { + client.httpGet("/_cat/health", Collections.emptyMap()); + fail(); + } catch (IOException ex) { + assertTrue(ex.getMessage(), ex.getMessage().endsWith("failed: connect timed out") || ex.getMessage().endsWith("failed: Connection refused")); + } + assertEquals(3, client.getNumHosts()); + assertEquals(3, client.getNumBlacklistedHosts()); + int num = 0; + for (HttpHost host : client.getHostIterator(false)) { + num++; // nothing here + } + assertEquals(0, num); + for (HttpHost host : client.getHostIterator(true)) { + num++; // all there - we have to retry now + } + assertEquals(3, num); + } + } + + +} diff --git a/settings.gradle b/settings.gradle index 215b436e20a..831b2c5d2fb 100644 --- a/settings.gradle +++ b/settings.gradle @@ -5,6 +5,7 @@ List projects = [ 'rest-api-spec', 'core', 'docs', + 'client', 'distribution:integ-test-zip', 'distribution:zip', 'distribution:tar',