RestClient prototype

This commit is contained in:
Simon Willnauer 2016-02-22 11:37:57 -08:00 committed by Luca Cavanna
parent 24a7b7224b
commit 3efbe95ca4
5 changed files with 517 additions and 0 deletions

82
client/build.gradle Normal file
View File

@ -0,0 +1,82 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import org.elasticsearch.gradle.precommit.PrecommitTasks;
group = 'org.elasticsearch.client'
apply plugin: 'elasticsearch.build'
dependencies {
// we use the lucene test-framework here but we are not pulling in ES core or the test framework
testCompile "com.carrotsearch.randomizedtesting:randomizedtesting-runner:${versions.randomizedrunner}"
testCompile "junit:junit:${versions.junit}"
testCompile 'org.hamcrest:hamcrest-all:1.3'
testCompile "org.apache.lucene:lucene-test-framework:${versions.lucene}"
testCompile "org.apache.lucene:lucene-core:${versions.lucene}"
testCompile "org.apache.lucene:lucene-codecs:${versions.lucene}"
// TODO once we got rid of the client in the test framework we should use a version variable here
// we use httpclient here since the JDK support has several issue
// - httpclient supports basic and digest auth and other schemes
// - URLConnection has issues with SSL and not all system patches are available
// - URLConnection can't stream data but httpclient can
// - URLConnection doesn't expose responsecodes unless it's a 200
// - httpclient supports pipelining which we might wanna expose down the road?
compile "org.apache.httpcomponents:httpclient:4.5.1"
compile "org.apache.httpcomponents:httpcore:4.4.4"
compile "org.apache.httpcomponents:httpcore-nio:4.4.4" // currently unused
compile "commons-logging:commons-logging:1.2"
compile 'org.apache.httpcomponents:httpasyncclient:4.1.1' // currently unused
}
compileJava.options.compilerArgs << '-Xlint:-cast,-rawtypes,-try,-unchecked'
compileTestJava.options.compilerArgs << '-Xlint:-rawtypes'
// the main files are actually test files, so use the appopriate forbidden api sigs
forbiddenApisMain {
bundledSignatures = ['jdk-unsafe', 'jdk-deprecated']
signaturesURLs = [PrecommitTasks.getResource('/forbidden/all-signatures.txt'),
PrecommitTasks.getResource('/forbidden/test-signatures.txt')]
}
forbiddenApisTest.enabled=false
forbiddenApisMain.enabled=false
// dependency license are currently checked in distribution
dependencyLicenses.enabled = false
jarHell.enabled = false
thirdPartyAudit.enabled = false
thirdPartyAudit.excludes = [
// classes are missing
'javax.servlet.ServletContextEvent',
'javax.servlet.ServletContextListener',
'org.apache.avalon.framework.logger.Logger',
'org.apache.log.Hierarchy',
'org.apache.log.Logger',
// we intentionally exclude the ant tasks because people were depending on them from their tests!!!!!!!
'org.apache.tools.ant.BuildException',
'org.apache.tools.ant.DirectoryScanner',
'org.apache.tools.ant.Task',
'org.apache.tools.ant.types.FileSet',
'org.easymock.EasyMock',
'org.easymock.IArgumentMatcher',
'org.jmock.core.Constraint',
]

View File

@ -0,0 +1,65 @@
package org.elasticsearch.client;
import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
/**
* Created by simon on 2/16/16.
*/
public class IndexClient {
private final RestClient client;
public IndexClient(RestClient client) {
this.client = client;
}
public void delete(String index, String type, String id) throws IOException {
delete(index, type, id, null);
}
public void delete(String index, String type, String id, DeleteOptions params) throws IOException {
Objects.requireNonNull(index, "index must not be null");
Objects.requireNonNull(type, "type must not be null");
Objects.requireNonNull(id, "id must not be null");
String deleteEndpoint = String.format("/%s/%s/%s", index, type, id);
client.httpDelete(deleteEndpoint, params == null ? Collections.emptyMap() : params.options);
}
public class DeleteOptions {
private final Map<String, Object> options = new HashMap<>();
/** Specific write consistency setting for the operation one of "one", "quorum", "all"*/
public void consistency(String consistency) {
options.put("consistency", consistency);
};
/** ID of parent document */
public void parent(String parent){
options.put("parent", parent);
};
/** Refresh the index after performing the operation */
public void refresh(Boolean refresh) {
options.put("refresh", refresh);
};
/** Specific routing value */
public void routing(String routing) {
options.put("routing", routing);
};
/** Explicit version number for concurrency control */
public void version(Number version) {
options.put("version", version);
};
/** Specific version type one of "internal", "external", "external_gte", "force" */
public void versionType(String versionType) {
options.put("version_type", versionType);
};
/** Explicit operation timeout */
public void timeout(String timeout) {
options.put("timeout", timeout);
};
}
}

View File

@ -0,0 +1,217 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.http.NameValuePair;
import org.apache.http.StatusLine;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.client.methods.HttpDelete;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.client.methods.HttpPut;
import org.apache.http.client.methods.HttpUriRequest;
import org.apache.http.client.utils.URLEncodedUtils;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClientBuilder;
import org.apache.http.message.BasicNameValuePair;
import java.io.BufferedReader;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.stream.Collectors;
/**
*/
public class RestClient implements Closeable{
private final CloseableHttpClient client;
private volatile Set<HttpHost> hosts;
private final String scheme;
private final Set<HttpHost> blackList = new CopyOnWriteArraySet<>();
public RestClient(HttpHost... hosts) {
this("http", HttpClientBuilder.create().setDefaultRequestConfig(RequestConfig.custom().setConnectTimeout(100).build()).build(), hosts);
}
public RestClient(String scheme, CloseableHttpClient client, HttpHost[] hosts) {
if (hosts.length == 0) {
throw new IllegalArgumentException("hosts must note be empty");
}
this.scheme = scheme;
this.client = client;
this.hosts = new HashSet<>(Arrays.asList(hosts));
}
public HttpResponse httpGet(String endpoint, Map<String, Object> params) throws IOException {
return httpGet(getHostIterator(true), endpoint, params);
}
HttpResponse httpGet(Iterable<HttpHost> hosts, String endpoint, Map<String, Object> params) throws IOException {
HttpUriRequest request = new HttpGet(buildUri(endpoint, pairs(params)));
return execute(request, hosts);
}
HttpResponse httpDelete(String endpoint, Map<String, Object> params) throws IOException {
HttpUriRequest request = new HttpDelete(buildUri(endpoint, pairs(params)));
return execute(request, getHostIterator(true));
}
HttpResponse httpPut(String endpoint, HttpEntity body, Map<String, Object> params) throws IOException {
HttpPut request = new HttpPut(buildUri(endpoint, pairs(params)));
request.setEntity(body);
return execute(request, getHostIterator(true));
}
HttpResponse httpPost(String endpoint, HttpEntity body, Map<String, Object> params) throws IOException {
HttpPost request = new HttpPost(buildUri(endpoint, pairs(params)));
request.setEntity(body);
return execute(request, getHostIterator(true));
}
private List<NameValuePair> pairs(Map<String, Object> options) {
return options.entrySet().stream().map(e -> new BasicNameValuePair(e.getKey(), e.getValue().toString()))
.collect(Collectors.toList());
}
public HttpResponse execute(HttpUriRequest request, Iterable<HttpHost> retryHosts) throws IOException {
IOException exception = null;
for (HttpHost singleHost : retryHosts) {
try {
return client.execute(singleHost, request);
} catch (IOException ex) {
if (this.hosts.contains(singleHost)) {
blackList.add(singleHost);
}
if (exception != null) {
exception.addSuppressed(ex);
} else {
exception = ex;
}
}
}
throw exception;
}
public URI buildUri(String path, List<NameValuePair> query) {
try {
return new URI(null, null, null, -1, path, URLEncodedUtils.format(query, StandardCharsets.UTF_8), null);
} catch (URISyntaxException e) {
throw new RuntimeException(e.getMessage(), e);
}
}
public Set<HttpHost> fetchNodes(HttpHost host, boolean useClientNodes, boolean local, boolean checkAvailable) throws IOException {
HttpResponse httpResponse = httpGet(Collections.singleton(host), "/_cat/nodes", Collections.singletonMap("h", "http,role"));
StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine.getStatusCode() != 200) {
throw new RuntimeException("failed to fetch nodes: " + statusLine.getReasonPhrase());
}
HttpEntity entity = httpResponse.getEntity();
Set<HttpHost> hosts = new HashSet<>();
try (BufferedReader content = new BufferedReader(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8))) {
String line;
while((line = content.readLine()) != null) {
final String[] split = line.split("\\s+");
assert split.length == 2;
String boundAddress = split[0];
String role = split[1];
if ("-".equals(split[0].trim()) == false) {
if ("d".equals(role.trim()) == false && useClientNodes == false) {
continue;
}
URI boundAddressAsURI = URI.create("http://" + boundAddress);
HttpHost newHost = new HttpHost(boundAddressAsURI.getHost(), boundAddressAsURI.getPort(), scheme);
if (checkAvailable == false || isAvailable(newHost)) {
hosts.add(newHost);
}
}
}
}
return hosts;
}
public String getClusterName(HttpHost host) throws IOException {
HttpResponse httpResponse = httpGet(Collections.singleton(host), "/_cat/health", Collections.singletonMap("h", "cluster"));
StatusLine statusLine = httpResponse.getStatusLine();
if (statusLine.getStatusCode() != 200) {
throw new RuntimeException("failed to fetch nodes: " + statusLine.getReasonPhrase());
}
HttpEntity entity = httpResponse.getEntity();
try (BufferedReader content = new BufferedReader(new InputStreamReader(entity.getContent(), StandardCharsets.UTF_8))) {
String clusterName = content.readLine().trim();
if (clusterName.length() == 0) {
throw new IllegalStateException("clustername must not be empty");
}
return clusterName;
}
}
public boolean isAvailable(HttpHost host) {
try {
HttpResponse httpResponse = httpGet(Collections.singleton(host), "/", Collections.emptyMap());
StatusLine statusLine = httpResponse.getStatusLine();
return statusLine.getStatusCode() == 200;
} catch (IOException ex) {
return false;
}
}
public synchronized void setNodes(Set<HttpHost> hosts) {
this.hosts = Collections.unmodifiableSet(new HashSet<>(hosts));
blackList.retainAll(hosts);
}
public Set<HttpHost> getHosts() {
return hosts;
}
protected Iterable<HttpHost> getHostIterator(boolean clearBlacklist) {
if (hosts.size() == blackList.size() && clearBlacklist) {
blackList.clear(); // lets try again
}
return () -> hosts.stream().filter((h) -> blackList.contains(h) == false).iterator();
}
int getNumHosts() {
return hosts.size();
}
int getNumBlacklistedHosts() {
return blackList.size();
}
@Override
public void close() throws IOException {
client.close();
}
}

View File

@ -0,0 +1,152 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.client;
import com.sun.net.httpserver.HttpExchange;
import com.sun.net.httpserver.HttpServer;
import org.apache.http.HttpHost;
import org.apache.http.HttpResponse;
import org.apache.lucene.util.LuceneTestCase;
import java.io.IOException;
import java.io.OutputStream;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutionException;
public class RestClientTests extends LuceneTestCase {
//TODO this should be refactored into a base test!!
HttpServer server;
protected String clusterName = "elasticsearch";
protected List<String> additionalNodes = Collections.emptyList();
public void setUp() throws Exception {
super.setUp();
server = HttpServer.create(new InetSocketAddress(0), 0);
server.setExecutor(null); // creates a default executor
server.start();
server.createContext("/", (t) -> {
handle("/", t);
});
server.createContext("/_cat/nodes", (t) -> {
handle("/_cat/nodes", t);
});
server.createContext("/_cat/health", (t) -> {
handle("/_cat/health", t);
});
}
protected void handle(String path, HttpExchange t) throws IOException {
final String response;
switch (path) {
case "/":
response = "{}";
break;
case "/_cat/nodes":
StringBuilder builder = new StringBuilder( "127.0.0.1:" + server.getAddress().getPort() + " " + "d\n");
for (String host : additionalNodes) {
builder.append(host).append("\n");
}
response = builder.toString();
break;
case "/_cat/health":
response = clusterName;
break;
default:
throw new IllegalArgumentException("no such handler " + path);
}
t.sendResponseHeaders(200, response.length());
OutputStream os = t.getResponseBody();
os.write(response.getBytes());
os.close();
}
public void tearDown() throws Exception {
super.tearDown();
server.stop(0);
}
public void testGetClustername() throws IOException {
HttpHost httpHost = new HttpHost("127.0.0.1", server.getAddress().getPort(), "http");
try(RestClient client = new RestClient(httpHost)) {
assertEquals(clusterName, client.getClusterName(httpHost));
}
}
public void testFetchNodes() throws IOException {
additionalNodes = Arrays.asList("127.0.0.2:9200 c", "127.0.0.3:9200 d");
HttpHost httpHost = new HttpHost("127.0.0.1", server.getAddress().getPort(), "http");
try(RestClient client = new RestClient(httpHost)) {
assertEquals(3, client.fetchNodes(httpHost, true, true, false).size());
assertTrue(client.fetchNodes(httpHost, true, true, false).toString(), client.fetchNodes(httpHost, true, true, false).contains(new HttpHost("127.0.0.2", 9200, "http")));
assertTrue(client.fetchNodes(httpHost, true, true, false).contains(new HttpHost("127.0.0.3", 9200, "http")));
assertTrue(client.fetchNodes(httpHost, true, true, false).contains(httpHost));
assertEquals(1, client.fetchNodes(httpHost, true, true, true).size());
}
}
public void testSimpleRetry() throws IOException{
additionalNodes = Arrays.asList("127.0.0.2:9200 c", "127.0.0.3:9200 d");
HttpHost httpHost = new HttpHost("127.0.0.1", server.getAddress().getPort(), "http");
try(RestClient client = new RestClient(httpHost)) {
client.setNodes(client.fetchNodes(httpHost, true, true, false));
HttpResponse httpResponse = client.httpGet("/_cat/health", Collections.emptyMap());
assertEquals(httpResponse.getStatusLine().getStatusCode(), 200);
server.stop(0);
try {
client.httpGet("/_cat/health", Collections.emptyMap());
fail();
} catch (IOException ex) {
assertTrue(ex.getMessage(), ex.getMessage().endsWith("failed: connect timed out") || ex.getMessage().endsWith("failed: Connection refused"));
}
}
}
public void testBlacklist() throws IOException{
additionalNodes = Arrays.asList("127.0.0.2:9200 c", "127.0.0.3:9200 d");
HttpHost httpHost = new HttpHost("127.0.0.1", server.getAddress().getPort(), "http");
try(RestClient client = new RestClient(httpHost)) {
client.setNodes(client.fetchNodes(httpHost, true, true, false));
assertEquals(3, client.getNumHosts());
assertEquals(0, client.getNumBlacklistedHosts());
server.stop(0);
try {
client.httpGet("/_cat/health", Collections.emptyMap());
fail();
} catch (IOException ex) {
assertTrue(ex.getMessage(), ex.getMessage().endsWith("failed: connect timed out") || ex.getMessage().endsWith("failed: Connection refused"));
}
assertEquals(3, client.getNumHosts());
assertEquals(3, client.getNumBlacklistedHosts());
int num = 0;
for (HttpHost host : client.getHostIterator(false)) {
num++; // nothing here
}
assertEquals(0, num);
for (HttpHost host : client.getHostIterator(true)) {
num++; // all there - we have to retry now
}
assertEquals(3, num);
}
}
}

View File

@ -5,6 +5,7 @@ List projects = [
'rest-api-spec',
'core',
'docs',
'client',
'distribution:integ-test-zip',
'distribution:zip',
'distribution:tar',