From 925afa3cab5daff9230db8cdb950e97e3780c998 Mon Sep 17 00:00:00 2001 From: markharwood Date: Thu, 10 Dec 2015 14:34:30 +0000 Subject: [PATCH] Graph - port of 2.x graph API and kibana UI plugin Closes X-plugins issue 518 Original commit: elastic/x-pack-elasticsearch@6c6371ed74a3aadde32b2f572d2a4568174c1bed --- .../smoke-test-graph-with-shield/build.gradle | 38 + .../qa/smoke-test-graph-with-shield/roles.yml | 17 + .../smoketest/GraphWithShieldIT.java | 75 ++ .../GraphWithShieldInsufficientRoleIT.java | 38 + elasticsearch/x-pack/build.gradle | 2 +- .../java/org/elasticsearch/graph/Graph.java | 97 +++ .../graph/action/Connection.java | 141 ++++ .../graph/action/GraphExploreAction.java | 30 + .../graph/action/GraphExploreRequest.java | 338 ++++++++ .../action/GraphExploreRequestBuilder.java | 158 ++++ .../graph/action/GraphExploreResponse.java | 203 +++++ .../org/elasticsearch/graph/action/Hop.java | 149 ++++ .../action/TransportGraphExploreAction.java | 781 ++++++++++++++++++ .../elasticsearch/graph/action/Vertex.java | 180 ++++ .../graph/action/VertexRequest.java | 198 +++++ .../graph/license/GraphLicensee.java | 59 ++ .../graph/license/GraphModule.java | 20 + .../graph/rest/action/RestGraphAction.java | 335 ++++++++ .../graph/license/LicenseTests.java | 150 ++++ .../elasticsearch/graph/test/GraphTests.java | 385 +++++++++ .../rest-api-spec/api/graph.explore.json | 33 + .../rest-api-spec/test/graph/10_basic.yaml | 43 + .../AbstractLicensesIntegrationTestCase.java | 2 + .../java/org/elasticsearch/shield/Shield.java | 13 + .../transport/KnownActionsTests.java | 5 + .../org/elasticsearch/transport/actions | 1 + .../org/elasticsearch/transport/handlers | 1 + .../org/elasticsearch/xpack/XPackPlugin.java | 10 + 28 files changed, 3501 insertions(+), 1 deletion(-) create mode 100644 elasticsearch/qa/smoke-test-graph-with-shield/build.gradle create mode 100644 elasticsearch/qa/smoke-test-graph-with-shield/roles.yml create mode 100644 elasticsearch/qa/smoke-test-graph-with-shield/src/test/java/org/elasticsearch/smoketest/GraphWithShieldIT.java create mode 100644 elasticsearch/qa/smoke-test-graph-with-shield/src/test/java/org/elasticsearch/smoketest/GraphWithShieldInsufficientRoleIT.java create mode 100644 elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/Graph.java create mode 100644 elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/Connection.java create mode 100644 elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreAction.java create mode 100644 elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreRequest.java create mode 100644 elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreRequestBuilder.java create mode 100644 elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreResponse.java create mode 100644 elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/Hop.java create mode 100644 elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/TransportGraphExploreAction.java create mode 100644 elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/Vertex.java create mode 100644 elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/VertexRequest.java create mode 100644 elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/license/GraphLicensee.java create mode 100644 elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/license/GraphModule.java create mode 100644 elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/rest/action/RestGraphAction.java create mode 100644 elasticsearch/x-pack/graph/src/test/java/org/elasticsearch/graph/license/LicenseTests.java create mode 100644 elasticsearch/x-pack/graph/src/test/java/org/elasticsearch/graph/test/GraphTests.java create mode 100644 elasticsearch/x-pack/graph/src/test/resources/rest-api-spec/api/graph.explore.json create mode 100644 elasticsearch/x-pack/graph/src/test/resources/rest-api-spec/test/graph/10_basic.yaml diff --git a/elasticsearch/qa/smoke-test-graph-with-shield/build.gradle b/elasticsearch/qa/smoke-test-graph-with-shield/build.gradle new file mode 100644 index 00000000000..59b1d61e454 --- /dev/null +++ b/elasticsearch/qa/smoke-test-graph-with-shield/build.gradle @@ -0,0 +1,38 @@ +apply plugin: 'elasticsearch.rest-test' + +dependencies { + testCompile project(path: ':x-plugins:elasticsearch:x-pack', configuration: 'runtime') +} + +// bring in graph rest test suite +task copyGraphRestTests(type: Copy) { + into project.sourceSets.test.output.resourcesDir + from project(':x-plugins:elasticsearch:x-pack').sourceSets.test.resources.srcDirs + include 'rest-api-spec/test/graph/**' +} + +integTest { + dependsOn copyGraphRestTests + + cluster { + plugin 'x-pack', project(':x-plugins:elasticsearch:x-pack') + extraConfigFile 'xpack/roles.yml', 'roles.yml' + setupCommand 'setupTestAdminUser', + 'bin/xpack/esusers', 'useradd', 'test_admin', '-p', 'changeme', '-r', 'admin' + setupCommand 'setupGraphExplorerUser', + 'bin/xpack/esusers', 'useradd', 'graph_explorer', '-p', 'changeme', '-r', 'graph_explorer' + setupCommand 'setupPowerlessUser', + 'bin/xpack/esusers', 'useradd', 'no_graph_explorer', '-p', 'changeme', '-r', 'no_graph_explorer' + waitCondition = { node, ant -> + File tmpFile = new File(node.cwd, 'wait.success') + ant.get(src: "http://${node.httpUri()}", + dest: tmpFile.toString(), + username: 'test_admin', + password: 'changeme', + ignoreerrors: true, + retries: 10) + return tmpFile.exists() + } + } +} + diff --git a/elasticsearch/qa/smoke-test-graph-with-shield/roles.yml b/elasticsearch/qa/smoke-test-graph-with-shield/roles.yml new file mode 100644 index 00000000000..e5559b90fe9 --- /dev/null +++ b/elasticsearch/qa/smoke-test-graph-with-shield/roles.yml @@ -0,0 +1,17 @@ +admin: + cluster: all + indices: + '*': all + +graph_explorer: + cluster: cluster:monitor/health + indices: + '*': + privileges: graph, indices:data/write/index, indices:admin/refresh, indices:admin/create + + +no_graph_explorer: + cluster: cluster:monitor/health + indices: + '*': + privileges: indices:data/read/search, indices:data/write/index, indices:admin/refresh, indices:admin/create diff --git a/elasticsearch/qa/smoke-test-graph-with-shield/src/test/java/org/elasticsearch/smoketest/GraphWithShieldIT.java b/elasticsearch/qa/smoke-test-graph-with-shield/src/test/java/org/elasticsearch/smoketest/GraphWithShieldIT.java new file mode 100644 index 00000000000..f86fd8ee11c --- /dev/null +++ b/elasticsearch/qa/smoke-test-graph-with-shield/src/test/java/org/elasticsearch/smoketest/GraphWithShieldIT.java @@ -0,0 +1,75 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.smoketest; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; + +import org.apache.http.client.methods.HttpPut; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.impl.conn.BasicHttpClientConnectionManager; +//import org.elasticsearch.client.support.Headers; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.concurrent.ThreadContext; +import org.elasticsearch.plugins.Plugin; +//import org.elasticsearch.shield.ShieldPlugin; +import org.elasticsearch.shield.authc.support.SecuredString; +import org.elasticsearch.shield.authc.support.UsernamePasswordToken; +import org.elasticsearch.test.rest.ESRestTestCase; +import org.elasticsearch.test.rest.RestTestCandidate; +import org.elasticsearch.test.rest.parser.RestTestParseException; +import org.junit.After; +import org.junit.Before; + +import java.io.IOException; +import java.net.URI; +import java.net.URL; +import java.util.Collection; +import java.util.Collections; + +import static org.elasticsearch.shield.authc.support.UsernamePasswordToken.basicAuthHeaderValue; + +public class GraphWithShieldIT extends ESRestTestCase { + + private final static String TEST_ADMIN_USERNAME = "test_admin"; + private final static String TEST_ADMIN_PASSWORD = "changeme"; + + public GraphWithShieldIT(@Name("yaml") RestTestCandidate testCandidate) { + super(testCandidate); + } + + @ParametersFactory + public static Iterable parameters() throws IOException, RestTestParseException { + return ESRestTestCase.createParameters(0, 1); + } + + protected String[] getCredentials() { + return new String[]{"graph_explorer", "changeme"}; + } + + + @Override + protected Settings restClientSettings() { + String[] creds = getCredentials(); + String token = basicAuthHeaderValue(creds[0], new SecuredString(creds[1].toCharArray())); + return Settings.builder() + .put(ThreadContext.PREFIX + ".Authorization", token) + .build(); + } + + @Override + protected Settings restAdminSettings() { + String token = basicAuthHeaderValue(TEST_ADMIN_USERNAME, new SecuredString(TEST_ADMIN_PASSWORD.toCharArray())); + return Settings.builder() + .put(ThreadContext.PREFIX + ".Authorization", token) + .build(); + } + + + +} + diff --git a/elasticsearch/qa/smoke-test-graph-with-shield/src/test/java/org/elasticsearch/smoketest/GraphWithShieldInsufficientRoleIT.java b/elasticsearch/qa/smoke-test-graph-with-shield/src/test/java/org/elasticsearch/smoketest/GraphWithShieldInsufficientRoleIT.java new file mode 100644 index 00000000000..795f98edbda --- /dev/null +++ b/elasticsearch/qa/smoke-test-graph-with-shield/src/test/java/org/elasticsearch/smoketest/GraphWithShieldInsufficientRoleIT.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.smoketest; + +import com.carrotsearch.randomizedtesting.annotations.Name; +import org.elasticsearch.test.rest.RestTestCandidate; +import org.junit.Test; + +import java.io.IOException; + +import static org.hamcrest.Matchers.containsString; + +public class GraphWithShieldInsufficientRoleIT extends GraphWithShieldIT { + + public GraphWithShieldInsufficientRoleIT(@Name("yaml") RestTestCandidate testCandidate) { + super(testCandidate); + } + + public void test() throws IOException { + try { + super.test(); + fail(); + } catch(AssertionError ae) { + assertThat(ae.getMessage(), containsString("action [indices:data/read/graph/explore")); + assertThat(ae.getMessage(), containsString("returned [403 Forbidden]")); + assertThat(ae.getMessage(), containsString("is unauthorized for user [no_graph_explorer]")); + } + } + + @Override + protected String[] getCredentials() { + return new String[]{"no_graph_explorer", "changeme"}; + } +} + diff --git a/elasticsearch/x-pack/build.gradle b/elasticsearch/x-pack/build.gradle index d0866249cf0..d23c14eed0b 100644 --- a/elasticsearch/x-pack/build.gradle +++ b/elasticsearch/x-pack/build.gradle @@ -53,7 +53,7 @@ dependencies { // we keep the source directories in the original structure of split plugins, // in order to facilitate backports to 2.x. TODO: remove after 5.0 release -for (String module : ['', 'license-plugin/', 'shield/', 'watcher/', 'marvel/']) { +for (String module : ['', 'license-plugin/', 'shield/', 'watcher/', 'marvel/', 'graph/']) { sourceSets { main { java.srcDir("${module}src/main/java") diff --git a/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/Graph.java b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/Graph.java new file mode 100644 index 00000000000..2eeabdb0cd7 --- /dev/null +++ b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/Graph.java @@ -0,0 +1,97 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.graph; + +import static org.elasticsearch.common.settings.Setting.Scope.CLUSTER; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +import org.elasticsearch.action.ActionModule; +import org.elasticsearch.action.search.SearchAction; +import org.elasticsearch.common.component.LifecycleComponent; +import org.elasticsearch.common.inject.Module; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Setting; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.SettingsModule; +import org.elasticsearch.graph.action.GraphExploreAction; +import org.elasticsearch.graph.action.TransportGraphExploreAction; +import org.elasticsearch.graph.license.GraphLicensee; +import org.elasticsearch.graph.license.GraphModule; +import org.elasticsearch.graph.rest.action.RestGraphAction; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.search.action.SearchTransportService; +import org.elasticsearch.shield.Shield; +import org.elasticsearch.xpack.XPackPlugin; + +public class Graph extends Plugin { + + public static final String NAME = "graph"; + private final boolean transportClientMode; + protected final boolean enabled; + + + public Graph(Settings settings) { + this.transportClientMode = XPackPlugin.transportClientMode(settings); + enabled = enabled(settings); + // adding the graph privileges to shield + if (Shield.enabled(settings)) { + Shield.registerIndexPrivilege( "graph", GraphExploreAction.NAME, SearchTransportService.QUERY_ACTION_NAME, + SearchAction.NAME, SearchTransportService.QUERY_FETCH_ACTION_NAME); + } + } + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "Elasticsearch Graph Plugin"; + } + + public static boolean enabled(Settings settings) { + return XPackPlugin.featureEnabled(settings, NAME, true); + } + + public void onModule(ActionModule actionModule) { + if (enabled) { + actionModule.registerAction(GraphExploreAction.INSTANCE, TransportGraphExploreAction.class); + } + } + + public void onModule(NetworkModule module) { + if (enabled && transportClientMode == false) { + module.registerRestHandler(RestGraphAction.class); + } + } + + public void onModule(SettingsModule module) { + module.registerSetting(Setting.boolSetting(XPackPlugin.featureEnabledSetting(NAME), true, false, CLUSTER)); + } + + + public Collection nodeModules() { + if (enabled == false|| transportClientMode) { + return Collections.emptyList(); + } + return Arrays. asList(new GraphModule()); + } + + @Override + public Collection> nodeServices() { + if (enabled == false|| transportClientMode) { + return Collections.emptyList(); + } + return Arrays.>asList( + GraphLicensee.class + ); + } + +} diff --git a/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/Connection.java b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/Connection.java new file mode 100644 index 00000000000..af7efc6d8ca --- /dev/null +++ b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/Connection.java @@ -0,0 +1,141 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.graph.action; + +import com.carrotsearch.hppc.ObjectIntHashMap; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.ToXContent.Params; +import org.elasticsearch.graph.action.Vertex.VertexId; + +import java.io.IOException; +import java.util.Map; + +/** + * A Connection links exactly two {@link Vertex} objects. The basis of a + * connection is one or more documents have been found that contain + * this pair of terms and the strength of the connection is recorded + * as a weight. + */ +public class Connection { + Vertex from; + Vertex to; + double weight; + long docCount; + + Connection(Vertex from, Vertex to, double weight, long docCount) { + this.from = from; + this.to = to; + this.weight = weight; + this.docCount = docCount; + } + + void readFrom(StreamInput in, Map vertices) throws IOException { + from = vertices.get(new VertexId(in.readString(), in.readString())); + to = vertices.get(new VertexId(in.readString(), in.readString())); + weight = in.readDouble(); + docCount = in.readVLong(); + } + + Connection() { + } + + void writeTo(StreamOutput out) throws IOException { + out.writeString(from.getField()); + out.writeString(from.getTerm()); + out.writeString(to.getField()); + out.writeString(to.getTerm()); + out.writeDouble(weight); + out.writeVLong(docCount); + } + + public ConnectionId getId() { + return new ConnectionId(from.getId(), to.getId()); + } + + public Vertex getFrom() { + return from; + } + + public Vertex getTo() { + return to; + } + + /** + * @return a measure of the relative connectedness between a pair of {@link Vertex} objects + */ + public double getWeight() { + return weight; + } + + /** + * @return the number of documents in the sampled set that contained this + * pair of {@link Vertex} objects. + */ + public long getDocCount() { + return docCount; + } + + void toXContent(XContentBuilder builder, Params params, ObjectIntHashMap vertexNumbers) throws IOException { + builder.field("source", vertexNumbers.get(from)); + builder.field("target", vertexNumbers.get(to)); + builder.field("weight", weight); + builder.field("doc_count", docCount); + } + + /** + * An identifier (implements hashcode and equals) that represents a + * unique key for a {@link Connection} + */ + public static class ConnectionId { + private final VertexId source; + private final VertexId target; + + public ConnectionId(VertexId source, VertexId target) { + this.source = source; + this.target = target; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + ConnectionId vertexId = (ConnectionId) o; + + if (source != null ? !source.equals(vertexId.source) : vertexId.source != null) + return false; + if (target != null ? !target.equals(vertexId.target) : vertexId.target != null) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = source != null ? source.hashCode() : 0; + result = 31 * result + (target != null ? target.hashCode() : 0); + return result; + } + + public VertexId getSource() { + return source; + } + + public VertexId getTarget() { + return target; + } + + @Override + public String toString() { + return getSource() + "->" + getTarget(); + } + } +} \ No newline at end of file diff --git a/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreAction.java b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreAction.java new file mode 100644 index 00000000000..33687ac98ec --- /dev/null +++ b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreAction.java @@ -0,0 +1,30 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.graph.action; + +import org.elasticsearch.action.Action; +import org.elasticsearch.client.ElasticsearchClient; + +public class GraphExploreAction extends Action { + + public static final GraphExploreAction INSTANCE = new GraphExploreAction(); + public static final String NAME = "indices:data/read/graph/explore"; + + private GraphExploreAction() { + super(NAME); + } + + @Override + public GraphExploreResponse newResponse() { + return new GraphExploreResponse(); + } + + @Override + public GraphExploreRequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new GraphExploreRequestBuilder(client, this); + } +} diff --git a/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreRequest.java b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreRequest.java new file mode 100644 index 00000000000..3ddf8072399 --- /dev/null +++ b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreRequest.java @@ -0,0 +1,338 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.graph.action; + +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.IndicesRequest; +import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.aggregations.bucket.sampler.SamplerAggregatorBuilder; +import org.elasticsearch.search.aggregations.bucket.sampler.SamplerParser; +import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; + +/** + * Holds the criteria required to guide the exploration of connected terms which + * can be returned as a graph. + */ +public class GraphExploreRequest extends ActionRequest implements IndicesRequest.Replaceable { + + public static final String NO_HOPS_ERROR_MESSAGE = "Graph explore request must have at least one hop"; + public static final String NO_VERTICES_ERROR_MESSAGE = "Graph explore hop must have at least one VertexRequest"; + private String[] indices = Strings.EMPTY_ARRAY; + private IndicesOptions indicesOptions = IndicesOptions.fromOptions(false, false, true, false); + private String[] types = Strings.EMPTY_ARRAY; + private String routing; + private TimeValue timeout; + + private int sampleSize = SamplerAggregatorBuilder.DEFAULT_SHARD_SAMPLE_SIZE; + private String sampleDiversityField; + private int maxDocsPerDiversityValue; + private boolean useSignificance = true; + private boolean returnDetailedInfo; + + private List hops = new ArrayList<>(); + + public GraphExploreRequest() { + } + + /** + * Constructs a new graph request to run against the provided + * indices. No indices means it will run against all indices. + */ + public GraphExploreRequest(String... indices) { + this.indices = indices; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (hops.size() == 0) { + validationException = ValidateActions.addValidationError(NO_HOPS_ERROR_MESSAGE, validationException); + } + for (Hop hop : hops) { + validationException = hop.validate(validationException); + } + return validationException; + } + + @Override + public String[] indices() { + return this.indices; + } + + + @Override + public GraphExploreRequest indices(String... indices) { + this.indices = indices; + return this; + } + + @Override + public IndicesOptions indicesOptions() { + return indicesOptions; + } + + public GraphExploreRequest indicesOptions(IndicesOptions indicesOptions) { + if (indicesOptions == null) { + throw new IllegalArgumentException("IndicesOptions must not be null"); + } + this.indicesOptions = indicesOptions; + return this; + } + + public String[] types() { + return this.types; + } + + public GraphExploreRequest types(String... types) { + this.types = types; + return this; + } + + public String routing() { + return this.routing; + } + + public GraphExploreRequest routing(String routing) { + this.routing = routing; + return this; + } + + public GraphExploreRequest routing(String... routings) { + this.routing = Strings.arrayToCommaDelimitedString(routings); + return this; + } + + public TimeValue timeout() { + return timeout; + } + + /** + * Graph exploration can be set to timeout after the given period. Search operations involved in + * each hop are limited to the remaining time available but can still overrun due to the nature + * of their "best efforts" timeout support. When a timeout occurs partial results are returned. + * @param timeout a {@link TimeValue} object which determines the maximum length of time to spend exploring + */ + public GraphExploreRequest timeout(TimeValue timeout) { + if (timeout == null) { + throw new IllegalArgumentException("timeout must not be null"); + } + this.timeout = timeout; + return this; + } + + public GraphExploreRequest timeout(String timeout) { + timeout(TimeValue.parseTimeValue(timeout, null, getClass().getSimpleName() + ".timeout")); + return this; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + + indices = in.readStringArray(); + indicesOptions = IndicesOptions.readIndicesOptions(in); + types = in.readStringArray(); + routing = in.readOptionalString(); + if (in.readBoolean()) { + timeout = TimeValue.readTimeValue(in); + } + sampleSize = in.readInt(); + sampleDiversityField = in.readOptionalString(); + maxDocsPerDiversityValue = in.readInt(); + + useSignificance = in.readBoolean(); + returnDetailedInfo = in.readBoolean(); + + int numHops = in.readInt(); + Hop parentHop = null; + for (int i = 0; i < numHops; i++) { + Hop hop = new Hop(parentHop); + hop.readFrom(in); + hops.add(hop); + parentHop = hop; + } + + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeStringArray(indices); + indicesOptions.writeIndicesOptions(out); + out.writeStringArray(types); + out.writeOptionalString(routing); + out.writeOptionalStreamable(timeout); + + out.writeInt(sampleSize); + out.writeOptionalString(sampleDiversityField); + out.writeInt(maxDocsPerDiversityValue); + + out.writeBoolean(useSignificance); + out.writeBoolean(returnDetailedInfo); + out.writeInt(hops.size()); + for (Iterator iterator = hops.iterator(); iterator.hasNext();) { + Hop hop = iterator.next(); + hop.writeTo(out); + } + } + + @Override + public String toString() { + return "graph explore [" + Arrays.toString(indices) + "][" + Arrays.toString(types) + "]"; + } + + /** + * The number of top-matching documents that are considered during each hop (default is + * {@link SamplerAggregatorBuilder#DEFAULT_SHARD_SAMPLE_SIZE} + * Very small values (less than 50) may not provide sufficient weight-of-evidence to identify + * significant connections between terms. + *

Very large values (many thousands) are not recommended with loosely defined queries (fuzzy queries or those + * with many OR clauses). + * This is because any useful signals in the best documents are diluted with irrelevant noise from low-quality matches. + * Performance is also typically better with smaller samples as there are less look-ups required for background frequencies + * of terms found in the documents + *

+ * + * @param maxNumberOfDocsPerHop shard-level sample size in documents + */ + public void sampleSize(int maxNumberOfDocsPerHop) { + sampleSize = maxNumberOfDocsPerHop; + } + + public int sampleSize() { + return sampleSize; + } + + /** + * Optional choice of single-value field on which to diversify sampled + * search results + */ + public void sampleDiversityField(String name) { + sampleDiversityField = name; + } + + public String sampleDiversityField() { + return sampleDiversityField; + } + + /** + * Optional number of permitted docs with same value in sampled search + * results. Must also declare which field using sampleDiversityField + */ + public void maxDocsPerDiversityValue(int maxDocs) { + this.maxDocsPerDiversityValue = maxDocs; + } + + public int maxDocsPerDiversityValue() { + return maxDocsPerDiversityValue; + } + + /** + * Controls the choice of algorithm used to select interesting terms. The default + * value is true which means terms are selected based on significance (see the {@link SignificantTerms} + * aggregation) rather than popularity (using the {@link TermsAggregator}). + * @param value true if the significant_terms algorithm should be used. + */ + public void useSignificance(boolean value) { + this.useSignificance = value; + } + + public boolean useSignificance() { + return useSignificance; + } + + /** + * Return detailed information about vertex frequencies as part of JSON results - defaults to false + * @param value true if detailed information is required in JSON responses + */ + public void returnDetailedInfo(boolean value) { + this.returnDetailedInfo = value; + } + + public boolean returnDetailedInfo() { + return returnDetailedInfo; + } + + + /** + * Add a stage in the graph exploration. Each hop represents a stage of + * querying elasticsearch to identify terms which can then be connnected + * to other terms in a subsequent hop. + * @param guidingQuery optional choice of query which influences which documents + * are considered in this stage + * @return a {@link Hop} object that holds settings for a stage in the graph exploration + */ + public Hop createNextHop(QueryBuilder guidingQuery) { + Hop parent = null; + if (hops.size() > 0) { + parent = hops.get(hops.size() - 1); + } + Hop newHop = new Hop(parent); + newHop.guidingQuery = guidingQuery; + hops.add(newHop); + return newHop; + } + + public int getHopNumbers() { + return hops.size(); + } + + public Hop getHop(int hopNumber) { + return hops.get(hopNumber); + } + + public static class TermBoost { + String term; + float boost; + + public TermBoost(String term, float boost) { + super(); + this.term = term; + if (boost <= 0) { + throw new IllegalArgumentException("Boosts must be a positive non-zero number"); + } + this.boost = boost; + } + + TermBoost() { + } + + public String getTerm() { + return term; + } + + public float getBoost() { + return boost; + } + + void readFrom(StreamInput in) throws IOException { + this.term = in.readString(); + this.boost = in.readFloat(); + } + + void writeTo(StreamOutput out) throws IOException { + out.writeString(term); + out.writeFloat(boost); + } + + } + + +} diff --git a/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreRequestBuilder.java b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreRequestBuilder.java new file mode 100644 index 00000000000..090ac3c3ab8 --- /dev/null +++ b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreRequestBuilder.java @@ -0,0 +1,158 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.graph.action; + +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.search.aggregations.bucket.sampler.SamplerAggregatorBuilder; +import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregator; + +/** + * Creates a new {@link GraphExploreRequestBuilder} + * + * @see GraphExploreRequest + */ +public class GraphExploreRequestBuilder extends ActionRequestBuilder { + + public GraphExploreRequestBuilder(ElasticsearchClient client, GraphExploreAction action) { + super(client, action, new GraphExploreRequest()); + } + + public GraphExploreRequestBuilder setIndices(String... indices) { + request.indices(indices); + return this; + } + + /** + * Specifies what type of requested indices to ignore and wildcard indices expressions. + *

+ * For example indices that don't exist. + */ + public GraphExploreRequestBuilder setIndicesOptions(IndicesOptions options) { + request.indicesOptions(options); + return this; + } + + /** + * A comma separated list of routing values to control the shards the action will be executed on. + */ + public GraphExploreRequestBuilder setRouting(String routing) { + request.routing(routing); + return this; + } + + /** + * The routing values to control the shards that the action will be executed on. + */ + public GraphExploreRequestBuilder setRouting(String... routing) { + request.routing(routing); + return this; + } + + /** + * Optional choice of single-value field on which to diversify sampled + * search results + */ + public GraphExploreRequestBuilder sampleDiversityField(String fieldName) { + request.sampleDiversityField(fieldName); + return this; + } + + public String sampleDiversityField() { + return request.sampleDiversityField(); + } + + /** + * Optional number of permitted docs with same value in sampled search + * results. Must also declare which field using sampleDiversityField + */ + public GraphExploreRequestBuilder maxDocsPerDiversityValue(int max) { + request.maxDocsPerDiversityValue(max); + return this; + } + + public int maxDocsPerDiversityValue() { + return request.maxDocsPerDiversityValue(); + } + + + /** + * An optional timeout to control how long the graph exploration is allowed + * to take. + */ + public GraphExploreRequestBuilder setTimeout(TimeValue timeout) { + request.timeout(timeout); + return this; + } + + /** + * An optional timeout to control how long the graph exploration is allowed + * to take. + */ + public GraphExploreRequestBuilder setTimeout(String timeout) { + request.timeout(timeout); + return this; + } + + /** + * The types of documents the graph exploration will run against. Defaults + * to all types. + */ + public GraphExploreRequestBuilder setTypes(String... types) { + request.types(types); + return this; + } + + /** + * Add a stage in the graph exploration. Each hop represents a stage of + * querying elasticsearch to identify terms which can then be connnected + * to other terms in a subsequent hop. + * @param guidingQuery optional choice of query which influences which documents + * are considered in this stage + * @return a {@link Hop} object that holds settings for a stage in the graph exploration + */ + public Hop createNextHop(@Nullable QueryBuilder guidingQuery) { + return request.createNextHop(guidingQuery); + } + + /** + * Controls the choice of algorithm used to select interesting terms. The default + * value is true which means terms are selected based on significance (see the {@link SignificantTerms} + * aggregation) rather than popularity (using the {@link TermsAggregator}). + * @param value true if the significant_terms algorithm should be used. + */ + public GraphExploreRequestBuilder useSignificance(boolean value) { + request.useSignificance(value); + return this; + } + + + /** + * The number of top-matching documents that are considered during each hop (default is + * {@link SamplerAggregatorBuilder#DEFAULT_SHARD_SAMPLE_SIZE} + * Very small values (less than 50) may not provide sufficient weight-of-evidence to identify + * significant connections between terms. + *

Very large values (many thousands) are not recommended with loosely defined queries (fuzzy queries or + * those with many OR clauses). + * This is because any useful signals in the best documents are diluted with irrelevant noise from low-quality matches. + * Performance is also typically better with smaller samples as there are less look-ups required for background frequencies + * of terms found in the documents + *

+ * + * @param maxNumberOfDocsPerHop the shard-level sample size in documents + */ + public GraphExploreRequestBuilder sampleSize(int maxNumberOfDocsPerHop) { + request.sampleSize(maxNumberOfDocsPerHop); + return this; + } + +} diff --git a/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreResponse.java b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreResponse.java new file mode 100644 index 00000000000..298b8a8a5cb --- /dev/null +++ b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/GraphExploreResponse.java @@ -0,0 +1,203 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.graph.action; + +import com.carrotsearch.hppc.ObjectIntHashMap; + +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentBuilderString; +import org.elasticsearch.graph.action.Connection.ConnectionId; +import org.elasticsearch.graph.action.Vertex.VertexId; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import static org.elasticsearch.action.search.ShardSearchFailure.readShardSearchFailure; + +/** + * Graph explore response holds a graph of {@link Vertex} and {@link Connection} objects + * (nodes and edges in common graph parlance). + * + * @see GraphExploreRequest + */ +public class GraphExploreResponse extends ActionResponse implements ToXContent { + + private long tookInMillis; + private boolean timedOut = false; + private ShardOperationFailedException[] shardFailures = ShardSearchFailure.EMPTY_ARRAY; + private Map vertices; + private Map connections; + private boolean returnDetailedInfo; + static final String RETURN_DETAILED_INFO_PARAM = "returnDetailedInfo"; + + GraphExploreResponse() { + } + + GraphExploreResponse(long tookInMillis, boolean timedOut, ShardOperationFailedException[] shardFailures, Map vertices, + Map connections, boolean returnDetailedInfo) { + this.tookInMillis = tookInMillis; + this.timedOut = timedOut; + this.shardFailures = shardFailures; + this.vertices = vertices; + this.connections = connections; + this.returnDetailedInfo = returnDetailedInfo; + } + + + public TimeValue getTook() { + return new TimeValue(tookInMillis); + } + + public long getTookInMillis() { + return tookInMillis; + } + + /** + * @return true if the time stated in {@link GraphExploreRequest#timeout(TimeValue)} was exceeded + * (not all hops may have been completed in this case) + */ + public boolean isTimedOut() { + return this.timedOut; + } + public ShardOperationFailedException[] getShardFailures() { + return shardFailures; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + tookInMillis = in.readVLong(); + timedOut = in.readBoolean(); + + int size = in.readVInt(); + if (size == 0) { + shardFailures = ShardSearchFailure.EMPTY_ARRAY; + } else { + shardFailures = new ShardSearchFailure[size]; + for (int i = 0; i < shardFailures.length; i++) { + shardFailures[i] = readShardSearchFailure(in); + } + } + // read vertices + size = in.readVInt(); + vertices = new HashMap<>(); + for (int i = 0; i < size; i++) { + Vertex n = Vertex.readFrom(in); + vertices.put(n.getId(), n); + } + + size = in.readVInt(); + + connections = new HashMap<>(); + for (int i = 0; i < size; i++) { + Connection e = new Connection(); + e.readFrom(in, vertices); + connections.put(e.getId(), e); + } + + returnDetailedInfo = in.readBoolean(); + + } + + public Collection getConnections() { + return connections.values(); + } + + public Collection getVertices() { + return vertices.values(); + } + + public Vertex getVertex(VertexId id) { + return vertices.get(id); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeVLong(tookInMillis); + out.writeBoolean(timedOut); + + out.writeVInt(shardFailures.length); + for (ShardOperationFailedException shardSearchFailure : shardFailures) { + shardSearchFailure.writeTo(out); + } + + out.writeVInt(vertices.size()); + for (Vertex vertex : vertices.values()) { + vertex.writeTo(out); + } + + out.writeVInt(connections.size()); + for (Connection connection : connections.values()) { + connection.writeTo(out); + } + + out.writeBoolean(returnDetailedInfo); + + } + + static final class Fields { + static final XContentBuilderString TOOK = new XContentBuilderString("took"); + static final XContentBuilderString TIMED_OUT = new XContentBuilderString("timed_out"); + static final XContentBuilderString INDICES = new XContentBuilderString("_indices"); + static final XContentBuilderString FAILURES = new XContentBuilderString("failures"); + static final XContentBuilderString VERTICES = new XContentBuilderString("vertices"); + static final XContentBuilderString CONNECTIONS = new XContentBuilderString("connections"); + + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.field(Fields.TOOK, tookInMillis); + builder.field(Fields.TIMED_OUT, timedOut); + + builder.startArray(Fields.FAILURES); + if (shardFailures != null) { + for (ShardOperationFailedException shardFailure : shardFailures) { + builder.startObject(); + shardFailure.toXContent(builder, params); + builder.endObject(); + } + } + builder.endArray(); + + ObjectIntHashMap vertexNumbers = new ObjectIntHashMap<>(vertices.size()); + + Map extraParams = new HashMap<>(); + extraParams.put(RETURN_DETAILED_INFO_PARAM, Boolean.toString(returnDetailedInfo)); + Params extendedParams = new DelegatingMapParams(extraParams, params); + + builder.startArray(Fields.VERTICES); + for (Vertex vertex : vertices.values()) { + builder.startObject(); + vertexNumbers.put(vertex, vertexNumbers.size()); + vertex.toXContent(builder, extendedParams); + builder.endObject(); + } + builder.endArray(); + + builder.startArray(Fields.CONNECTIONS); + for (Connection connection : connections.values()) { + builder.startObject(); + connection.toXContent(builder, extendedParams, vertexNumbers); + builder.endObject(); + } + builder.endArray(); + + return builder; + } + + +} diff --git a/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/Hop.java b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/Hop.java new file mode 100644 index 00000000000..68dcfe03482 --- /dev/null +++ b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/Hop.java @@ -0,0 +1,149 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.graph.action; + +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ValidateActions; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.index.query.QueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * A Hop represents one of potentially many stages in a graph exploration. + * Each Hop identifies one or more fields in which it will attempt to find + * terms that are significantly connected to the previous Hop. Each field is identified + * using a {@link VertexRequest} + * + *

An example series of Hops on webserver logs would be: + *

    + *
  1. an initial Hop to find + * the top ten IPAddresses trying to access urls containing the word "admin"
  2. + *
  3. a secondary Hop to see which other URLs those IPAddresses were trying to access
  4. + *
+ * + *

+ * Optionally, each hop can contain a "guiding query" that further limits the set of documents considered. + * In our weblog example above we might choose to constrain the second hop to only look at log records that + * had a reponse code of 404. + *

+ *

+ * If absent, the list of {@link VertexRequest}s is inherited from the prior Hop's list to avoid repeating + * the fields that will be examined at each stage. + *

+ * + */ +public class Hop { + final Hop parentHop; + List vertices = null; + QueryBuilder guidingQuery = null; + + Hop(Hop parent) { + this.parentHop = parent; + } + + public ActionRequestValidationException validate(ActionRequestValidationException validationException) { + + if (getEffectiveVertexRequests().size() == 0) { + validationException = ValidateActions.addValidationError(GraphExploreRequest.NO_VERTICES_ERROR_MESSAGE, validationException); + } + return validationException; + + } + + public Hop getParentHop() { + return parentHop; + } + + void writeTo(StreamOutput out) throws IOException { + if (guidingQuery == null) { + out.writeBoolean(false); + } else { + out.writeBoolean(true); + out.writeQuery(guidingQuery); + } + if (vertices == null) { + out.writeVInt(0); + } else { + out.writeVInt(vertices.size()); + for (VertexRequest vr : vertices) { + vr.writeTo(out); + } + } + } + + void readFrom(StreamInput in) throws IOException { + if (in.readBoolean()) { + guidingQuery = in.readQuery(); + } + int size = in.readVInt(); + if (size > 0) { + vertices = new ArrayList<>(); + for (int i = 0; i < size; i++) { + VertexRequest vr = new VertexRequest(); + vr.readFrom(in); + vertices.add(vr); + } + } + } + + public QueryBuilder guidingQuery() { + if (guidingQuery != null) { + return guidingQuery; + } + return QueryBuilders.matchAllQuery(); + } + + /** + * Add a field in which this {@link Hop} will look for terms that are highly linked to + * previous hops and optionally the guiding query. + * + * @param fieldName a field in the chosen index + */ + public VertexRequest addVertexRequest(String fieldName) { + if (vertices == null) { + vertices = new ArrayList<>(); + } + VertexRequest vr = new VertexRequest(); + vr.fieldName(fieldName); + vertices.add(vr); + return vr; + } + + /** + * An optional parameter that focuses the exploration on documents that + * match the given query. + * + * @param queryBuilder any query + */ + public void guidingQuery(QueryBuilder queryBuilder) { + guidingQuery = queryBuilder; + } + + protected List getEffectiveVertexRequests() { + if (vertices != null) { + return vertices; + } + if (parentHop == null) { + return Collections.emptyList(); + } + // otherwise inherit settings from parent + return parentHop.getEffectiveVertexRequests(); + } + + int getNumberVertexRequests() { + return getEffectiveVertexRequests().size(); + } + + VertexRequest getVertexRequest(int requestNumber) { + return getEffectiveVertexRequests().get(requestNumber); + } +} \ No newline at end of file diff --git a/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/TransportGraphExploreAction.java b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/TransportGraphExploreAction.java new file mode 100644 index 00000000000..0c0ba350b66 --- /dev/null +++ b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/TransportGraphExploreAction.java @@ -0,0 +1,781 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.graph.action; + +import org.apache.lucene.search.BooleanQuery; +import org.apache.lucene.util.PriorityQueue; +import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ShardOperationFailedException; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.ShardSearchFailure; +import org.elasticsearch.action.search.TransportSearchAction; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.util.CollectionUtils; +import org.elasticsearch.graph.action.Connection.ConnectionId; +import org.elasticsearch.graph.action.GraphExploreRequest.TermBoost; +import org.elasticsearch.graph.action.Vertex.VertexId; +import org.elasticsearch.graph.license.GraphLicensee; +import org.elasticsearch.index.query.BoolQueryBuilder; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.license.plugin.core.LicenseUtils; +import org.elasticsearch.search.aggregations.AggregationBuilders; +import org.elasticsearch.search.aggregations.AggregatorBuilder; +import org.elasticsearch.search.aggregations.bucket.sampler.DiversifiedAggregatorBuilder; +import org.elasticsearch.search.aggregations.bucket.sampler.Sampler; +import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms; +import org.elasticsearch.search.aggregations.bucket.significant.SignificantTerms.Bucket; +import org.elasticsearch.search.aggregations.bucket.significant.SignificantTermsAggregatorBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.Terms; +import org.elasticsearch.search.aggregations.bucket.terms.TermsAggregatorBuilder; +import org.elasticsearch.search.aggregations.bucket.terms.support.IncludeExclude; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Performs a series of elasticsearch queries and aggregations to explore + * connected terms in a single index. + */ +public class TransportGraphExploreAction extends HandledTransportAction { + + private final TransportSearchAction searchAction; + protected final GraphLicensee licensee; + + static class VertexPriorityQueue extends PriorityQueue { + + public VertexPriorityQueue(int maxSize) { + super(maxSize); + } + + @Override + protected boolean lessThan(Vertex a, Vertex b) { + return a.weight < b.weight; + } + + } + + @Inject + public TransportGraphExploreAction(Settings settings, ThreadPool threadPool, TransportSearchAction transportSearchAction, + TransportService transportService, ActionFilters actionFilters, IndexNameExpressionResolver indexNameExpressionResolver, + GraphLicensee licensee) { + super(settings, GraphExploreAction.NAME, threadPool, transportService, actionFilters, indexNameExpressionResolver, + GraphExploreRequest::new); + this.searchAction = transportSearchAction; + this.licensee = licensee; + } + + @Override + protected void doExecute(GraphExploreRequest request, ActionListener listener) { + if (licensee.isGraphExploreAllowed()) { + new AsyncGraphAction(request, listener).start(); + } else { + listener.onFailure(LicenseUtils.newComplianceException(GraphLicensee.ID)); + } + } + + class AsyncGraphAction { + + private final GraphExploreRequest request; + private final ActionListener listener; + + private final long startTime; + private final AtomicBoolean timedOut; + private volatile ShardOperationFailedException[] shardFailures; + private Map vertices = new HashMap<>(); + private Map connections = new HashMap<>(); + + // Each "hop" is recorded here using hopNumber->fieldName->vertices + private Map>> hopFindings = new HashMap<>(); + private int currentHopNumber = 0; + + AsyncGraphAction(GraphExploreRequest request, ActionListener listener) { + this.request = request; + this.listener = listener; + this.startTime = threadPool.estimatedTimeInMillis(); + this.timedOut = new AtomicBoolean(false); + this.shardFailures = ShardSearchFailure.EMPTY_ARRAY; + } + + private Vertex getVertex(String field, String term) { + return vertices.get(Vertex.createId(field, term)); + } + + private Connection addConnection(Vertex from, Vertex to, double weight, long docCount) { + Connection connection = new Connection(from, to, weight, docCount); + connections.put(connection.getId(), connection); + return connection; + } + + private Vertex addVertex(String field, String term, double score, int depth, long bg, long fg) { + VertexId key = Vertex.createId(field, term); + Vertex vertex = vertices.get(key); + if (vertex == null) { + vertex = new Vertex(field, term, score, depth, bg, fg); + vertices.put(key, vertex); + Map> currentWave = hopFindings.get(currentHopNumber); + if (currentWave == null) { + currentWave = new HashMap<>(); + hopFindings.put(currentHopNumber, currentWave); + } + Set verticesForField = currentWave.get(field); + if (verticesForField == null) { + verticesForField = new HashSet<>(); + currentWave.put(field, verticesForField); + } + verticesForField.add(vertex); + } + return vertex; + } + + private void removeVertex(Vertex vertex) { + vertices.remove(vertex.getId()); + hopFindings.get(currentHopNumber).get(vertex.field).remove(vertex); + } + + + /** + * Step out from some existing vertex terms looking for useful + * connections + */ + synchronized void expand() { + if (hasTimedOut()) { + timedOut.set(true); + listener.onResponse(buildResponse()); + return; + } + Map> lastHopFindings = hopFindings.get(currentHopNumber); + if ((currentHopNumber >= (request.getHopNumbers() - 1)) || (lastHopFindings == null) || (lastHopFindings.size() == 0)) { + // Either we gathered no leads from the last hop or we have + // reached the final hop + listener.onResponse(buildResponse()); + return; + } + Hop lastHop = request.getHop(currentHopNumber); + currentHopNumber++; + Hop currentHop = request.getHop(currentHopNumber); + + final SearchRequest searchRequest = new SearchRequest(request.indices()).types(request.types()).indicesOptions( + request.indicesOptions()); + if (request.routing() != null) { + searchRequest.routing(request.routing()); + } + + BoolQueryBuilder rootBool = QueryBuilders.boolQuery(); + + // A single sample pool of docs is built at the root of the aggs tree. + // For quality's sake it might have made more sense to sample top docs + // for each of the terms from the previous hop (e.g. an initial query for "beatles" + // may have seperate doc-sample pools for significant root terms "john", "paul", "yoko" etc) + // but I found this dramatically slowed down execution - each pool typically had different docs which + // each had non-overlapping sets of terms that needed frequencies looking up for significant terms. + // A common sample pool reduces the specialization that can be given to each root term but + // ultimately is much faster to run because of the shared vocabulary in a single sample set. + AggregatorBuilder sampleAgg = null; + if (request.sampleDiversityField() != null) { + DiversifiedAggregatorBuilder diversifiedSampleAgg = AggregationBuilders.diversifiedSampler("sample") + .shardSize(request.sampleSize()); + diversifiedSampleAgg.field(request.sampleDiversityField()); + diversifiedSampleAgg.maxDocsPerValue(request.maxDocsPerDiversityValue()); + sampleAgg = diversifiedSampleAgg; + }else{ + sampleAgg = AggregationBuilders.sampler("sample").shardSize(request.sampleSize()); + } + + // Add any user-supplied criteria to the root query as a must clause + rootBool.must(currentHop.guidingQuery()); + + // Build a MUST clause that matches one of either + // a:) include clauses supplied by the client or + // b:) vertex terms from the previous hop. + BoolQueryBuilder sourceTermsOrClause = QueryBuilders.boolQuery(); + addUserDefinedIncludesToQuery(currentHop, sourceTermsOrClause); + addBigOrClause(lastHopFindings, sourceTermsOrClause); + + rootBool.must(sourceTermsOrClause); + + + //Now build the agg tree that will channel the content -> + // base agg is terms agg for terms from last wave (one per field), + // under each is a sig_terms agg to find next candidates (again, one per field)... + for (int fieldNum = 0; fieldNum < lastHop.getNumberVertexRequests(); fieldNum++) { + VertexRequest lastVr = lastHop.getVertexRequest(fieldNum); + Set lastWaveVerticesForField = lastHopFindings.get(lastVr.fieldName()); + if (lastWaveVerticesForField == null) { + continue; + } + String[] terms = new String[lastWaveVerticesForField.size()]; + int i = 0; + for (Vertex v : lastWaveVerticesForField) { + terms[i++] = v.term; + } + TermsAggregatorBuilder lastWaveTermsAgg = AggregationBuilders.terms("field" + fieldNum) + .includeExclude(new IncludeExclude(terms, null)) + .shardMinDocCount(1) + .field(lastVr.fieldName()).minDocCount(1) + // Map execution mode used because Sampler agg keeps us + // focused on smaller sets of high quality docs and therefore + // examine smaller volumes of terms + .executionHint("map") + .size(terms.length); + sampleAgg.subAggregation(lastWaveTermsAgg); + for (int f = 0; f < currentHop.getNumberVertexRequests(); f++) { + VertexRequest vr = currentHop.getVertexRequest(f); + int size=vr.size(); + if (vr.fieldName().equals(lastVr.fieldName())) { + //We have the potential for self-loops as we are looking at the same field so add 1 to the requested size + // because we need to eliminate fieldA:termA -> fieldA:termA links that are likely to be in the results. + size++; + } + if (request.useSignificance()) { + SignificantTermsAggregatorBuilder nextWaveSigTerms = AggregationBuilders.significantTerms("field" + f) + .field(vr.fieldName()) + .minDocCount(vr.minDocCount()).shardMinDocCount(vr.shardMinDocCount()).executionHint("map").size(size); +// nextWaveSigTerms.significanceHeuristic(new PercentageScore.PercentageScoreBuilder()); + //Had some issues with no significant terms being returned when asking for small + // number of final results (eg 1) and only one shard. Setting shard_size higher helped. + if (size < 10) { + nextWaveSigTerms.shardSize(10); + } + // Alternative choices of significance algo didn't seem to be improvements.... +// nextWaveSigTerms.significanceHeuristic(new GND.GNDBuilder(true)); +// nextWaveSigTerms.significanceHeuristic(new ChiSquare.ChiSquareBuilder(false, true)); + + if (vr.hasIncludeClauses()) { + String[] includes = vr.includeValuesAsStringArray(); + nextWaveSigTerms.includeExclude(new IncludeExclude(includes, null)); + // Originally I thought users would always want the + // same number of results as listed in the include + // clause but it may be the only want the most + // significant e.g. in the lastfm example of + // plotting a single user's tastes and how that maps + // into a network showing only the most interesting + // band connections. So line below commmented out + + // nextWaveSigTerms.size(includes.length); + + } else if (vr.hasExcludeClauses()) { + nextWaveSigTerms.includeExclude(new IncludeExclude(null, vr.excludesAsArray())); + } + lastWaveTermsAgg.subAggregation(nextWaveSigTerms); + } else { + TermsAggregatorBuilder nextWavePopularTerms = AggregationBuilders.terms("field" + f).field(vr.fieldName()) + .minDocCount(vr.minDocCount()).shardMinDocCount(vr.shardMinDocCount()) + // Map execution mode used because Sampler agg keeps us + // focused on smaller sets of high quality docs and therefore + // examine smaller volumes of terms + .executionHint("map") + .size(size); + if (vr.hasIncludeClauses()) { + String[] includes = vr.includeValuesAsStringArray(); + nextWavePopularTerms.includeExclude(new IncludeExclude(includes, null)); + // nextWavePopularTerms.size(includes.length); + } else if (vr.hasExcludeClauses()) { + nextWavePopularTerms.includeExclude(new IncludeExclude(null, vr.excludesAsArray())); + } + lastWaveTermsAgg.subAggregation(nextWavePopularTerms); + } + } + } + + // Execute the search + SearchSourceBuilder source = new SearchSourceBuilder().query(rootBool).aggregation(sampleAgg).size(0); + if (request.timeout() != null) { + source.timeout(TimeValue.timeValueMillis(timeRemainingMillis())); + } + searchRequest.source(source); + + // System.out.println(source); + logger.trace("executing expansion graph search request"); + searchAction.execute(searchRequest, new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + // System.out.println(searchResponse); + addShardFailures(searchResponse.getShardFailures()); + + ArrayList newConnections = new ArrayList(); + ArrayList newVertices = new ArrayList(); + Sampler sample = searchResponse.getAggregations().get("sample"); + + // We think of the total scores as the energy-level pouring + // out of all the last hop's connections. + // Each new node encountered is given a score which is + // normalized between zero and one based on + // what percentage of the total scores its own score + // provides + double totalSignalOutput = getExpandTotalSignalStrength(lastHop, currentHop, sample); + + // Signal output can be zero if we did not encounter any new + // terms as part of this stage + if (totalSignalOutput > 0) { + addAndScoreNewVertices(lastHop, currentHop, sample, totalSignalOutput, newConnections, newVertices); + + trimNewAdditions(currentHop, newConnections, newVertices); + } + + // Potentially run another round of queries to perform next"hop" - will terminate if no new additions + expand(); + + } + + + // Add new vertices and apportion share of total signal along + // connections + private void addAndScoreNewVertices(Hop lastHop, Hop currentHop, Sampler sample, double totalSignalOutput, + ArrayList newConnections, ArrayList newVertices) { + // Gather all matching terms into the graph and propagate + // signals + for (int j = 0; j < lastHop.getNumberVertexRequests(); j++) { + VertexRequest lastVr = lastHop.getVertexRequest(j); + Terms lastWaveTerms = sample.getAggregations().get("field" + j); + if(lastWaveTerms == null){ + // There were no terms from the previous phase that needed pursuing + continue; + } + List buckets = lastWaveTerms.getBuckets(); + for (org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket lastWaveTerm : buckets) { + Vertex fromVertex = getVertex(lastVr.fieldName(), lastWaveTerm.getKeyAsString()); + for (int k = 0; k < currentHop.getNumberVertexRequests(); k++) { + VertexRequest vr = currentHop.getVertexRequest(k); + // As we travel further out into the graph we apply a + // decay to the signals being propagated down the various channels. + double decay = 0.95d; + if (request.useSignificance()) { + SignificantTerms significantTerms = lastWaveTerm.getAggregations().get("field" + k); + if (significantTerms != null) { + for (Bucket bucket : significantTerms.getBuckets()) { + if ((vr.fieldName().equals(fromVertex.field)) && + (bucket.getKeyAsString().equals(fromVertex.term))) { + // Avoid self-joins + continue; + } + double signalStrength = bucket.getSignificanceScore() / totalSignalOutput; + + // Decay the signal by the weight attached to the source vertex + signalStrength = signalStrength * Math.min(decay, fromVertex.weight); + + Vertex toVertex = getVertex(vr.fieldName(), bucket.getKeyAsString()); + if (toVertex == null) { + toVertex = addVertex(vr.fieldName(), bucket.getKeyAsString(), signalStrength, + currentHopNumber, bucket.getSupersetDf(), bucket.getSubsetDf()); + newVertices.add(toVertex); + } else { + toVertex.weight += signalStrength; + // We cannot (without further querying) determine an accurate number + // for the foreground count of the toVertex term - if we sum the values + // from each fromVertex term we may actually double-count occurrences so + // the best we can do is take the maximum foreground value we have observed + toVertex.fg = Math.max(toVertex.fg, bucket.getSubsetDf()); + } + newConnections.add(addConnection(fromVertex, toVertex, signalStrength, bucket.getDocCount())); + } + } + } else { + Terms terms = lastWaveTerm.getAggregations().get("field" + k); + if (terms != null) { + for (org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket bucket : terms.getBuckets()) { + double signalStrength = bucket.getDocCount() / totalSignalOutput; + // Decay the signal by the weight attached to the source vertex + signalStrength = signalStrength * Math.min(decay, fromVertex.weight); + + Vertex toVertex = getVertex(vr.fieldName(), bucket.getKeyAsString()); + if (toVertex == null) { + toVertex = addVertex(vr.fieldName(), bucket.getKeyAsString(), signalStrength, + currentHopNumber, 0, 0); + newVertices.add(toVertex); + } else { + toVertex.weight += signalStrength; + } + newConnections.add(addConnection(fromVertex, toVertex, signalStrength, bucket.getDocCount())); + } + } + } + } + } + } + } + + + // Having let the signals from the last results rattle around the graph + // we have adjusted weights for the various vertices we encountered. + // Now we review these new additions and remove those with the + // weakest weights. + // A priority queue is used to trim vertices according to the size settings + // requested for each field. + private final void trimNewAdditions(Hop currentHop, ArrayList newConnections, ArrayList newVertices) { + Set evictions = new HashSet<>(); + + for (int k = 0; k < currentHop.getNumberVertexRequests(); k++) { + // For each of the fields + VertexRequest vr = currentHop.getVertexRequest(k); + if (newVertices.size() <= vr.size()) { + // Nothing to trim + continue; + } + // Get the top vertices for this field + VertexPriorityQueue pq = new VertexPriorityQueue(vr.size()); + for (Vertex vertex : newVertices) { + if (vertex.field.equals(vr.fieldName())) { + Vertex eviction = pq.insertWithOverflow(vertex); + if (eviction != null) { + evictions.add(eviction); + } + } + } + } + // Remove weak new nodes and their dangling connections from the main graph + if (evictions.size() > 0) { + for (Connection connection : newConnections) { + if (evictions.contains(connection.to)) { + connections.remove(connection.getId()); + removeVertex(connection.to); + } + } + } + } + //TODO right now we only trim down to the best N vertices. We might also want to offer + // clients the option to limit to the best M connections. One scenario where this is required + // is if the "from" and "to" nodes are a client-supplied set of includes e.g. a list of + // music artists then the client may be wanting to draw only the most-interesting connections + // between them. See https://github.com/elastic/x-plugins/issues/518#issuecomment-160186424 + // I guess clients could trim the returned connections (which all have weights) but I wonder if + // we can do something server-side here + + // Helper method - compute the total signal of all scores in the search results + private final double getExpandTotalSignalStrength(Hop lastHop, Hop currentHop, Sampler sample) { + double totalSignalOutput = 0; + for (int j = 0; j < lastHop.getNumberVertexRequests(); j++) { + VertexRequest lastVr = lastHop.getVertexRequest(j); + Terms lastWaveTerms = sample.getAggregations().get("field" + j); + if (lastWaveTerms == null) { + continue; + } + List buckets = lastWaveTerms.getBuckets(); + for (org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket lastWaveTerm : buckets) { + for (int k = 0; k < currentHop.getNumberVertexRequests(); k++) { + VertexRequest vr = currentHop.getVertexRequest(k); + if (request.useSignificance()) { + // Signal is based on significance score + SignificantTerms significantTerms = lastWaveTerm.getAggregations().get("field" + k); + if (significantTerms != null) { + for (Bucket bucket : significantTerms.getBuckets()) { + if ((vr.fieldName().equals(lastVr.fieldName())) + && (bucket.getKeyAsString().equals(lastWaveTerm.getKeyAsString()))) { + // don't count self joins (term A obviously co-occurs with term A) + continue; + } else { + totalSignalOutput += bucket.getSignificanceScore(); + } + } + } + } else { + // Signal is based on popularity (number of + // documents) + Terms terms = lastWaveTerm.getAggregations().get("field" + k); + if (terms != null) { + for (org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket bucket : terms.getBuckets()) { + if ((vr.fieldName().equals(lastVr.fieldName())) + && (bucket.getKeyAsString().equals(lastWaveTerm.getKeyAsString()))) { + // don't count self joins (term A obviously co-occurs with term A) + continue; + } else { + totalSignalOutput += bucket.getDocCount(); + } + } + } + } + } + } + } + return totalSignalOutput; + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }); + } + + private void addUserDefinedIncludesToQuery(Hop hop, BoolQueryBuilder sourceTermsOrClause) { + for (int i = 0; i < hop.getNumberVertexRequests(); i++) { + VertexRequest vr=hop.getVertexRequest(i); + if (vr.hasIncludeClauses()) { + addNormalizedBoosts(sourceTermsOrClause, vr); + } + } + } + + private void addBigOrClause(Map> lastHopFindings, BoolQueryBuilder sourceTermsOrClause) { + int numClauses = sourceTermsOrClause.should().size(); + for (Entry> entry : lastHopFindings.entrySet()) { + numClauses += entry.getValue().size(); + } + if (numClauses < BooleanQuery.getMaxClauseCount()) { + // We can afford to build a Boolean OR query with individual + // boosts for interesting terms + for (Entry> entry : lastHopFindings.entrySet()) { + for (Vertex vertex : entry.getValue()) { + sourceTermsOrClause.should(QueryBuilders.constantScoreQuery(QueryBuilders.termQuery(vertex.field, vertex.term)) + .boost((float) vertex.weight)); + } + } + + } else { + // Too many terms - we need a cheaper form of query to execute this + for (Entry> entry : lastHopFindings.entrySet()) { + List perFieldTerms = new ArrayList<>(); + for (Vertex vertex : entry.getValue()) { + perFieldTerms.add(vertex.term); + } + sourceTermsOrClause.should(QueryBuilders.constantScoreQuery(QueryBuilders.termsQuery(entry.getKey(), perFieldTerms))); + } + } + } + + /** + * For a given root query (or a set of "includes" root constraints) find + * the related terms. These will be our start points in the graph + * navigation. + */ + public synchronized void start() { + try { + + final SearchRequest searchRequest = new SearchRequest(request.indices()).types(request.types()).indicesOptions( + request.indicesOptions()); + if (request.routing() != null) { + searchRequest.routing(request.routing()); + } + + BoolQueryBuilder rootBool = QueryBuilders.boolQuery(); + + AggregatorBuilder rootSampleAgg = null; + if (request.sampleDiversityField() != null) { + DiversifiedAggregatorBuilder diversifiedRootSampleAgg = AggregationBuilders.diversifiedSampler("sample") + .shardSize(request.sampleSize()); + diversifiedRootSampleAgg.field(request.sampleDiversityField()); + diversifiedRootSampleAgg.maxDocsPerValue(request.maxDocsPerDiversityValue()); + rootSampleAgg = diversifiedRootSampleAgg; + } else { + rootSampleAgg = AggregationBuilders.sampler("sample").shardSize(request.sampleSize()); + } + + + + Hop rootHop = request.getHop(0); + + // Add any user-supplied criteria to the root query as a should clause + rootBool.must(rootHop.guidingQuery()); + + + // If any of the root terms have an "include" restriction then + // we add a root-level MUST clause that + // mandates that at least one of the potentially many terms of + // interest must be matched (using a should array) + BoolQueryBuilder includesContainer = QueryBuilders.boolQuery(); + addUserDefinedIncludesToQuery(rootHop, includesContainer); + if (includesContainer.should().size() > 0) { + rootBool.must(includesContainer); + } + + + for (int i = 0; i < rootHop.getNumberVertexRequests(); i++) { + VertexRequest vr = rootHop.getVertexRequest(i); + if (request.useSignificance()) { + SignificantTermsAggregatorBuilder sigBuilder = AggregationBuilders.significantTerms("field" + i); + sigBuilder.field(vr.fieldName()).shardMinDocCount(vr.shardMinDocCount()).minDocCount(vr.minDocCount()) + // Map execution mode used because Sampler agg + // keeps us focused on smaller sets of high quality + // docs and therefore examine smaller volumes of terms + .executionHint("map").size(vr.size()); + // It is feasible that clients could provide a choice of + // significance heuristic at some point e.g: + // sigBuilder.significanceHeuristic(new + // PercentageScore.PercentageScoreBuilder()); + + if (vr.hasIncludeClauses()) { + String[] includes = vr.includeValuesAsStringArray(); + sigBuilder.includeExclude(new IncludeExclude(includes,null)); + sigBuilder.size(includes.length); + } + if (vr.hasExcludeClauses()) { + sigBuilder.includeExclude(new IncludeExclude(null, vr.excludesAsArray())); + } + rootSampleAgg.subAggregation(sigBuilder); + } else { + TermsAggregatorBuilder termsBuilder = AggregationBuilders.terms("field" + i); + // Min doc count etc really only applies when we are + // thinking about certainty of significance scores - + // perhaps less necessary when considering popularity + // termsBuilder.field(vr.fieldName()).shardMinDocCount(shardMinDocCount) + // .minDocCount(minDocCount).executionHint("map").size(vr.size()); + termsBuilder.field(vr.fieldName()).executionHint("map").size(vr.size()); + if (vr.hasIncludeClauses()) { + String[] includes = vr.includeValuesAsStringArray(); + termsBuilder.includeExclude(new IncludeExclude(includes,null)); + termsBuilder.size(includes.length); + } + if (vr.hasExcludeClauses()) { + termsBuilder.includeExclude(new IncludeExclude(null, vr.excludesAsArray())); + } + rootSampleAgg.subAggregation(termsBuilder); + } + } + + + // Run the search + SearchSourceBuilder source = new SearchSourceBuilder() + .query(rootBool) + .aggregation(rootSampleAgg).size(0); + if (request.timeout() != null) { + source.timeout(request.timeout()); + } + searchRequest.source(source); + // System.out.println(source); + logger.trace("executing initial graph search request"); + searchAction.execute(searchRequest, new ActionListener() { + @Override + public void onResponse(SearchResponse searchResponse) { + addShardFailures(searchResponse.getShardFailures()); + Sampler sample = searchResponse.getAggregations().get("sample"); + + // Determine the total scores for all interesting terms + double totalSignalStrength = getInitialTotalSignalStrength(rootHop, sample); + + + // Now gather the best matching terms and compute signal weight according to their + // share of the total signal strength + for (int j = 0; j < rootHop.getNumberVertexRequests(); j++) { + VertexRequest vr = rootHop.getVertexRequest(j); + if (request.useSignificance()) { + SignificantTerms significantTerms = sample.getAggregations().get("field" + j); + List buckets = significantTerms.getBuckets(); + for (Bucket bucket : buckets) { + double signalWeight = bucket.getSignificanceScore() / totalSignalStrength; + addVertex(vr.fieldName(), bucket.getKeyAsString(), signalWeight, + currentHopNumber, bucket.getSupersetDf(), bucket.getSubsetDf()); + } + } else { + Terms terms = sample.getAggregations().get("field" + j); + List buckets = terms.getBuckets(); + for (org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket bucket : buckets) { + double signalWeight = bucket.getDocCount() / totalSignalStrength; + addVertex(vr.fieldName(), bucket.getKeyAsString(), signalWeight, currentHopNumber, 0, 0); + } + } + } + // Expand out from these root vertices looking for connections with other terms + expand(); + + } + + // Helper method - Provides a total signal strength for all terms connected to the initial query + private final double getInitialTotalSignalStrength(Hop rootHop, Sampler sample) { + double totalSignalStrength = 0; + for (int i = 0; i < rootHop.getNumberVertexRequests(); i++) { + if (request.useSignificance()) { + // Signal is based on significance score + SignificantTerms significantTerms = sample.getAggregations().get("field" + i); + List buckets = significantTerms.getBuckets(); + for (Bucket bucket : buckets) { + totalSignalStrength += bucket.getSignificanceScore(); + } + } else { + // Signal is based on popularity (number of documents) + Terms terms = sample.getAggregations().get("field" + i); + List buckets = terms.getBuckets(); + for (org.elasticsearch.search.aggregations.bucket.terms.Terms.Bucket bucket : buckets) { + totalSignalStrength += bucket.getDocCount(); + } + } + } + return totalSignalStrength; + } + + @Override + public void onFailure(Throwable e) { + listener.onFailure(e); + } + }); + } catch (Throwable t) { + logger.error("unable to execute the graph query", t); + listener.onFailure(t); + } + } + + private void addNormalizedBoosts(BoolQueryBuilder includesContainer, VertexRequest vr) { + TermBoost[] termBoosts = vr.includeValues(); + + + if ((includesContainer.should().size() + termBoosts.length) > BooleanQuery.getMaxClauseCount()) { + // Too many terms - we need a cheaper form of query to execute this + List termValues = new ArrayList<>(); + for (TermBoost tb : termBoosts) { + termValues.add(tb.term); + } + includesContainer.should(QueryBuilders.constantScoreQuery(QueryBuilders.termsQuery(vr.fieldName(), termValues))); + return; + + } + // We have a sufficiently low number of terms to use the per-term boosts. + // Lucene boosts are >=1 so we baseline the provided boosts to start + // from 1 + float minBoost = Float.MAX_VALUE; + for (TermBoost tb : termBoosts) { + minBoost = Math.min(minBoost, tb.boost); + } + for (TermBoost tb : termBoosts) { + float normalizedBoost = tb.boost / minBoost; + includesContainer.should(QueryBuilders.termQuery(vr.fieldName(), tb.term).boost(normalizedBoost)); + } + } + + boolean hasTimedOut() { + return request.timeout() != null && (timeRemainingMillis() <= 0); + } + + long timeRemainingMillis() { + // Actual resolution of timer is granularity of the interval + // configured globally for updating estimated time. + return (startTime + request.timeout().millis()) - threadPool.estimatedTimeInMillis(); + } + + void addShardFailure(ShardOperationFailedException failure) { + addShardFailures(new ShardOperationFailedException[]{failure}); + } + + void addShardFailures(ShardOperationFailedException[] failures) { + if (!CollectionUtils.isEmpty(failures)) { + ShardOperationFailedException[] duplicates = new ShardOperationFailedException[shardFailures.length + failures.length]; + System.arraycopy(shardFailures, 0, duplicates, 0, shardFailures.length); + System.arraycopy(failures, 0, duplicates, shardFailures.length, failures.length); + shardFailures = ExceptionsHelper.groupBy(duplicates); + } + } + + protected GraphExploreResponse buildResponse() { + long took = threadPool.estimatedTimeInMillis() - startTime; + return new GraphExploreResponse(took, timedOut.get(), shardFailures, vertices, connections, request.returnDetailedInfo()); + } + + + } +} diff --git a/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/Vertex.java b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/Vertex.java new file mode 100644 index 00000000000..e9c4d5b77d5 --- /dev/null +++ b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/Vertex.java @@ -0,0 +1,180 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.graph.action; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.xcontent.ToXContent; +import org.elasticsearch.common.xcontent.XContentBuilder; + +import java.io.IOException; + +/** + * A vertex in a graph response represents a single term (a field and value pair) + * which appears in one or more documents found as part of the graph exploration. + * + * A vertex term could be a bank account number, an email address, a hashtag or any + * other term that appears in documents and is interesting to represent in a network. + */ +public class Vertex implements ToXContent { + final String field; + final String term; + double weight; + final int depth; + final long bg; + long fg; + + Vertex(String field, String term, double weight, int depth, long bg, long fg) { + super(); + this.field = field; + this.term = term; + this.weight = weight; + this.depth = depth; + this.bg = bg; + this.fg = fg; + } + + static Vertex readFrom(StreamInput in) throws IOException { + return new Vertex(in.readString(), in.readString(), in.readDouble(), in.readVInt(), in.readVLong(), in.readVLong()); + } + + void writeTo(StreamOutput out) throws IOException { + out.writeString(field); + out.writeString(term); + out.writeDouble(weight); + out.writeVInt(depth); + out.writeVLong(bg); + out.writeVLong(fg); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + boolean returnDetailedInfo = params.paramAsBoolean(GraphExploreResponse.RETURN_DETAILED_INFO_PARAM, false); + builder.field("field", field); + builder.field("term", term); + builder.field("weight", weight); + builder.field("depth", depth); + if (returnDetailedInfo) { + builder.field("fg", fg); + builder.field("bg", bg); + } + return builder; + } + + /** + * @return a {@link VertexId} object that uniquely identifies this Vertex + */ + public VertexId getId() { + return createId(field, term); + } + + /** + * A convenience method for creating a {@link VertexId} + * @param field the field + * @param term the term + * @return a {@link VertexId} that can be used for looking up vertices + */ + public static VertexId createId(String field, String term) { + return new VertexId(field,term); + } + + @Override + public String toString() { + return getId().toString(); + } + + public String getField() { + return field; + } + + public String getTerm() { + return term; + } + + /** + * The weight of a vertex is an accumulation of all of the {@link Connection}s + * that are linked to this {@link Vertex} as part of a graph exploration. + * It is used internally to identify the most interesting vertices to be returned. + * @return a measure of the {@link Vertex}'s relative importance. + */ + public double getWeight() { + return weight; + } + + /** + * If the {@link GraphExploreRequest#useSignificance(boolean)} is true (the default) + * this statistic is available. + * @return the number of documents in the index that contain this term (see bg_count in + * + * the significant_terms aggregation) + */ + public long getBg() { + return bg; + } + + /** + * If the {@link GraphExploreRequest#useSignificance(boolean)} is true (the default) + * this statistic is available. + * Together with {@link #getBg()} these numbers are used to derive the significance of a term. + * @return the number of documents in the sample of best matching documents that contain this term (see fg_count in + * + * the significant_terms aggregation) + */ + public long getFg() { + return fg; + } + + /** + * @return the sequence number in the series of hops where this Vertex term was first encountered + */ + public int getHopDepth() { + return depth; + } + + /** + * An identifier (implements hashcode and equals) that represents a + * unique key for a {@link Vertex} + */ + public static class VertexId { + private final String field; + private final String term; + + public VertexId(String field, String term) { + this.field = field; + this.term = term; + } + + @Override + public boolean equals(Object o) { + if (this == o) + return true; + if (o == null || getClass() != o.getClass()) + return false; + + VertexId vertexId = (VertexId) o; + + if (field != null ? !field.equals(vertexId.field) : vertexId.field != null) + return false; + if (term != null ? !term.equals(vertexId.term) : vertexId.term != null) + return false; + + return true; + } + + @Override + public int hashCode() { + int result = field != null ? field.hashCode() : 0; + result = 31 * result + (term != null ? term.hashCode() : 0); + return result; + } + + @Override + public String toString() { + return field + ":" + term; + } + } + +} \ No newline at end of file diff --git a/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/VertexRequest.java b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/VertexRequest.java new file mode 100644 index 00000000000..95033887e93 --- /dev/null +++ b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/action/VertexRequest.java @@ -0,0 +1,198 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.graph.action; + +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.graph.action.GraphExploreRequest.TermBoost; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * A request to identify terms from a choice of field as part of a {@link Hop}. + * Optionally, a set of terms can be provided that are used as an exclusion or + * inclusion list to filter which terms are considered. + * + */ +public class VertexRequest { + private String fieldName; + private int size = 5; + private Map includes; + private Set excludes; + public static final int DEFAULT_MIN_DOC_COUNT = 3; + private int minDocCount = DEFAULT_MIN_DOC_COUNT; + public static final int DEFAULT_SHARD_MIN_DOC_COUNT = 2; + private int shardMinDocCount = DEFAULT_SHARD_MIN_DOC_COUNT; + + + VertexRequest() { + + } + + void readFrom(StreamInput in) throws IOException { + fieldName = in.readString(); + size = in.readVInt(); + minDocCount = in.readVInt(); + shardMinDocCount = in.readVInt(); + + int numIncludes = in.readVInt(); + if (numIncludes > 0) { + includes = new HashMap<>(); + for (int i = 0; i < numIncludes; i++) { + TermBoost tb = new TermBoost(); + tb.readFrom(in); + includes.put(tb.term, tb); + } + } + + int numExcludes = in.readVInt(); + if (numExcludes > 0) { + excludes = new HashSet<>(); + for (int i = 0; i < numExcludes; i++) { + excludes.add(in.readString()); + } + } + + } + + void writeTo(StreamOutput out) throws IOException { + out.writeString(fieldName); + out.writeVInt(size); + out.writeVInt(minDocCount); + out.writeVInt(shardMinDocCount); + + if (includes != null) { + out.writeVInt(includes.size()); + for (TermBoost tb : includes.values()) { + tb.writeTo(out); + } + } else { + out.writeVInt(0); + } + + if (excludes != null) { + out.writeVInt(excludes.size()); + for (String term : excludes) { + out.writeString(term); + } + } else { + out.writeVInt(0); + } + } + + public String fieldName() { + return fieldName; + } + + public VertexRequest fieldName(String fieldName) { + this.fieldName = fieldName; + return this; + } + + public int size() { + return size; + } + + /** + * @param size The maximum number of terms that should be returned from this field as part of this {@link Hop} + */ + public VertexRequest size(int size) { + this.size = size; + return this; + } + + public boolean hasIncludeClauses() { + return includes != null && includes.size() > 0; + } + + public boolean hasExcludeClauses() { + return excludes != null && excludes.size() > 0; + } + + /** + * Adds a term that should be excluded from results + * @param term A term to be excluded + */ + public void addExclude(String term) { + if (includes != null) { + throw new IllegalArgumentException("Cannot have both include and exclude clauses"); + } + if (excludes == null) { + excludes = new HashSet<>(); + } + excludes.add(term); + } + + /** + * Adds a term to the set of allowed values - the boost defines the relative + * importance when pursuing connections in subsequent {@link Hop}s. The boost value + * appears as part of the query. + * @param term a required term + * @param boost an optional boost + */ + public void addInclude(String term, float boost) { + if (excludes != null) { + throw new IllegalArgumentException("Cannot have both include and exclude clauses"); + } + if (includes == null) { + includes = new HashMap<>(); + } + includes.put(term, new TermBoost(term, boost)); + } + + public TermBoost[] includeValues() { + return includes.values().toArray(new TermBoost[includes.size()]); + } + + String[] includeValuesAsStringArray() { + String[] result = new String[includes.size()]; + int i = 0; + for (TermBoost tb : includes.values()) { + result[i++] = tb.term; + } + return result; + } + + String[] excludesAsArray() { + return excludes.toArray(new String[excludes.size()]); + } + + public int minDocCount() { + return minDocCount; + } + + /** + * A "certainty" threshold which defines the weight-of-evidence required before + * a term found in this field is identified as a useful connection + * + * @param value The minimum number of documents that contain this term found in the samples used across all shards + */ + public VertexRequest minDocCount(int value) { + minDocCount = value; + return this; + } + + + public int shardMinDocCount() { + return Math.min(shardMinDocCount, minDocCount); + } + + /** + * A "certainty" threshold which defines the weight-of-evidence required before + * a term found in this field is identified as a useful connection + * + * @param value The minimum number of documents that contain this term found in the samples used across all shards + */ + public VertexRequest shardMinDocCount(int value) { + shardMinDocCount = value; + return this; + } + +} \ No newline at end of file diff --git a/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/license/GraphLicensee.java b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/license/GraphLicensee.java new file mode 100644 index 00000000000..a29c2ba2a7b --- /dev/null +++ b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/license/GraphLicensee.java @@ -0,0 +1,59 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.graph.license; + +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.core.License; +import org.elasticsearch.license.plugin.core.AbstractLicenseeComponent; +import org.elasticsearch.license.plugin.core.LicenseState; +import org.elasticsearch.license.plugin.core.LicenseeRegistry; +import org.elasticsearch.graph.Graph; + +import static org.elasticsearch.license.core.License.OperationMode.TRIAL; +import static org.elasticsearch.license.core.License.OperationMode.PLATINUM;; + +public class GraphLicensee extends AbstractLicenseeComponent { + + public static final String ID = Graph.NAME; + + @Inject + public GraphLicensee(Settings settings, LicenseeRegistry clientService) { + super(settings, ID, clientService); + } + + @Override + public String[] expirationMessages() { + return new String[] { + "Graph explore APIs are disabled" + }; + } + + @Override + public String[] acknowledgmentMessages(License currentLicense, License newLicense) { + switch (newLicense.operationMode()) { + case BASIC: + if (currentLicense != null) { + switch (currentLicense.operationMode()) { + case TRIAL: + case PLATINUM: + return new String[] { "Graph will be disabled" }; + } + } + break; + } + return Strings.EMPTY_ARRAY; + } + + + public boolean isGraphExploreAllowed() { + Status localStatus = status; + boolean isLicenseStateActive = localStatus.getLicenseState() != LicenseState.DISABLED; + License.OperationMode operationMode = localStatus.getMode(); + return isLicenseStateActive && (operationMode == TRIAL || operationMode == PLATINUM); + } +} diff --git a/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/license/GraphModule.java b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/license/GraphModule.java new file mode 100644 index 00000000000..0416ef9a7ce --- /dev/null +++ b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/license/GraphModule.java @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.graph.license; + +import org.elasticsearch.common.inject.AbstractModule; + +/** + * + */ +public class GraphModule extends AbstractModule { + + @Override + protected void configure() { + bind(GraphLicensee.class).asEagerSingleton(); + } + +} diff --git a/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/rest/action/RestGraphAction.java b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/rest/action/RestGraphAction.java new file mode 100644 index 00000000000..af74ba85f5d --- /dev/null +++ b/elasticsearch/x-pack/graph/src/main/java/org/elasticsearch/graph/rest/action/RestGraphAction.java @@ -0,0 +1,335 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.graph.rest.action; + +import org.elasticsearch.ElasticsearchParseException; +import org.elasticsearch.action.support.IndicesOptions; +import org.elasticsearch.client.Client; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.Strings; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.common.xcontent.XContentFactory; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.graph.action.GraphExploreRequest; +import org.elasticsearch.graph.action.GraphExploreResponse; +import org.elasticsearch.graph.action.Hop; +import org.elasticsearch.graph.action.VertexRequest; +import org.elasticsearch.graph.action.GraphExploreRequest.TermBoost; +import org.elasticsearch.index.query.QueryParseContext; +import org.elasticsearch.indices.query.IndicesQueriesRegistry; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestChannel; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.action.support.RestActions; +import org.elasticsearch.rest.action.support.RestToXContentListener; + +import java.io.IOException; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; + +import static org.elasticsearch.graph.action.GraphExploreAction.INSTANCE; +import static org.elasticsearch.rest.RestRequest.Method.GET; +import static org.elasticsearch.rest.RestRequest.Method.POST; + +/** + * @see GraphExploreRequest + */ +public class RestGraphAction extends BaseRestHandler { + + private IndicesQueriesRegistry indicesQueriesRegistry; + public static final ParseField TIMEOUT_FIELD = new ParseField("timeout"); + public static final ParseField SIGNIFICANCE_FIELD = new ParseField("use_significance"); + public static final ParseField RETURN_DETAILED_INFO = new ParseField("return_detailed_stats"); + public static final ParseField SAMPLE_DIVERSITY_FIELD = new ParseField("sample_diversity"); + public static final ParseField FIELD_NAME_FIELD = new ParseField("field"); + public static final ParseField MAX_DOCS_PER_VALUE_FIELD = new ParseField("max_docs_per_value"); + public static final ParseField SAMPLE_SIZE_FIELD = new ParseField("sample_size"); + public static final ParseField MIN_DOC_COUNT_FIELD = new ParseField("min_doc_count"); + public static final ParseField SHARD_MIN_DOC_COUNT_FIELD = new ParseField("shard_min_doc_count"); + public static final ParseField SIZE_FIELD = new ParseField("size"); + public static final ParseField INCLUDE_FIELD = new ParseField("include"); + public static final ParseField EXCLUDE_FIELD = new ParseField("exclude"); + public static final ParseField VERTICES_FIELD = new ParseField("vertices"); + public static final ParseField QUERY_FIELD = new ParseField("query"); + public static final ParseField CONTROLS_FIELD = new ParseField("controls"); + public static final ParseField CONNECTIONS_FIELD = new ParseField("connections"); + public static final ParseField BOOST_FIELD = new ParseField("boost"); + public static final ParseField TERM_FIELD = new ParseField("term"); + + @Inject + public RestGraphAction(Settings settings, RestController controller, Client client, IndicesQueriesRegistry indicesQueriesRegistry) { + super(settings, client); + controller.registerHandler(GET, "/{index}/_graph/explore", this); + controller.registerHandler(POST, "/{index}/_graph/explore", this); + controller.registerHandler(GET, "/{index}/{type}/_graph/explore", this); + controller.registerHandler(POST, "/{index}/{type}/_graph/explore", this); + this.indicesQueriesRegistry = indicesQueriesRegistry; + } + + @Override + public void handleRequest(final RestRequest request, final RestChannel channel, final Client client) throws IOException { + GraphExploreRequest graphRequest = new GraphExploreRequest(Strings.splitStringByCommaToArray(request.param("index"))); + graphRequest.indicesOptions(IndicesOptions.fromRequest(request, graphRequest.indicesOptions())); + graphRequest.routing(request.param("routing")); + if (request.hasParam(TIMEOUT_FIELD.getPreferredName())) { + graphRequest.timeout(request.paramAsTime(TIMEOUT_FIELD.getPreferredName(), null)); + } + if (!RestActions.hasBodyContent(request)) { + throw new ElasticsearchParseException("Body missing for graph request"); + } + QueryParseContext context = new QueryParseContext(indicesQueriesRegistry); + BytesReference qBytes = RestActions.getRestContent(request); + + Hop currentHop = graphRequest.createNextHop(null); + + try(XContentParser parser = XContentFactory.xContent(qBytes).createParser(qBytes)) { + + context.parser(parser); + + XContentParser.Token token = parser.nextToken(); + + if (token != XContentParser.Token.START_OBJECT) { + throw new ElasticsearchParseException("failed to parse search source. source must be an object, but found [{}] instead", + token.name()); + } + parseHop(parser, context, currentHop, graphRequest); + } + + graphRequest.types(Strings.splitStringByCommaToArray(request.param("type"))); + client.execute(INSTANCE, graphRequest, new RestToXContentListener(channel)); + } + + private void parseHop(XContentParser parser, QueryParseContext context, Hop currentHop, + GraphExploreRequest graphRequest) throws IOException { + String fieldName = null; + XContentParser.Token token; + + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + token = parser.nextToken(); + } + + if (token == XContentParser.Token.START_ARRAY) { + if (context.parseFieldMatcher().match(fieldName, VERTICES_FIELD)) { + parseVertices(parser, context, currentHop, graphRequest); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (context.parseFieldMatcher().match(fieldName, QUERY_FIELD)) { + currentHop.guidingQuery(context.parseInnerQueryBuilder()); + } else if (context.parseFieldMatcher().match(fieldName, CONNECTIONS_FIELD)) { + parseHop(parser, context, graphRequest.createNextHop(null), graphRequest); + } else if (context.parseFieldMatcher().match(fieldName, CONTROLS_FIELD)) { + if (currentHop.getParentHop() != null) { + throw new ElasticsearchParseException( + "Controls are a global setting that can only be set in the root " + fieldName, token.name()); + } + parseControls(parser, context, graphRequest); + } else { + throw new ElasticsearchParseException("Illegal object property in graph definition " + fieldName, token.name()); + + } + } else { + throw new ElasticsearchParseException("Illegal property in graph definition " + fieldName, token.name()); + } + + } + + } + + private void parseVertices(XContentParser parser, QueryParseContext context, Hop currentHop, GraphExploreRequest graphRequest) + throws IOException { + XContentParser.Token token; + + String fieldName = null; + + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token == XContentParser.Token.START_OBJECT) { + String field = null; + Map includes = null; + HashSet excludes = null; + int size = 10; + int minDocCount = 3; + int shardMinDocCount = 2; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + token = parser.nextToken(); + } + if (token == XContentParser.Token.START_ARRAY) { + if (context.parseFieldMatcher().match(fieldName, INCLUDE_FIELD)) { + if (excludes != null) { + throw new ElasticsearchParseException( + "Graph vertices definition cannot contain both "+INCLUDE_FIELD.getPreferredName()+" and " + +EXCLUDE_FIELD.getPreferredName()+" clauses", token.name()); + } + includes = new HashMap<>(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + if (token == XContentParser.Token.START_OBJECT) { + String includeTerm = null; + float boost = 1f; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else { + if (token == XContentParser.Token.VALUE_STRING) { + if (context.parseFieldMatcher().match(fieldName, TERM_FIELD)) { + includeTerm = parser.text(); + } else { + throw new ElasticsearchParseException( + "Graph vertices definition " + INCLUDE_FIELD.getPreferredName() + + " clause has invalid property:" + fieldName); + } + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (context.parseFieldMatcher().match(fieldName, BOOST_FIELD)) { + boost = parser.floatValue(); + } else { + throw new ElasticsearchParseException( + "Graph vertices definition " + INCLUDE_FIELD.getPreferredName() + + " clause has invalid property:" + fieldName); + } + } else { + throw new ElasticsearchParseException( + "Graph vertices definition " + INCLUDE_FIELD.getPreferredName() + + " clause has invalid property type:"+ token.name()); + + } + } + } + if (includeTerm == null) { + throw new ElasticsearchParseException( + "Graph vertices definition " + INCLUDE_FIELD.getPreferredName() + + " clause has missing object property for term"); + } + includes.put(includeTerm, new TermBoost(includeTerm, boost)); + } else if (token == XContentParser.Token.VALUE_STRING) { + String term = parser.text(); + includes.put(term, new TermBoost(term, 1f)); + } else { + throw new ElasticsearchParseException( + "Graph vertices definition " + INCLUDE_FIELD.getPreferredName() + + " clauses must be string terms or Objects with terms and boosts, not" + + token.name()); + } + } + } else if (context.parseFieldMatcher().match(fieldName, EXCLUDE_FIELD)) { + if (includes != null) { + throw new ElasticsearchParseException( + "Graph vertices definition cannot contain both "+ INCLUDE_FIELD.getPreferredName()+ + " and "+EXCLUDE_FIELD.getPreferredName()+" clauses", token.name()); + } + excludes = new HashSet(); + while ((token = parser.nextToken()) != XContentParser.Token.END_ARRAY) { + excludes.add(parser.text()); + } + } else { + throw new ElasticsearchParseException("Illegal property in graph vertices definition " + fieldName, + token.name()); + } + } + if (token == XContentParser.Token.VALUE_STRING) { + if (context.parseFieldMatcher().match(fieldName, FIELD_NAME_FIELD)) { + field = parser.text(); + } else { + throw new ElasticsearchParseException("Unknown string property: [" + fieldName + "]"); + } + } + if (token == XContentParser.Token.VALUE_NUMBER) { + if (context.parseFieldMatcher().match(fieldName, SIZE_FIELD)) { + size = parser.intValue(); + } else if (context.parseFieldMatcher().match(fieldName, MIN_DOC_COUNT_FIELD)) { + minDocCount = parser.intValue(); + } else if (context.parseFieldMatcher().match(fieldName, SHARD_MIN_DOC_COUNT_FIELD)) { + shardMinDocCount = parser.intValue(); + } else { + throw new ElasticsearchParseException("Unknown numeric property: [" + fieldName + "]"); + } + } + } + if (field == null) { + throw new ElasticsearchParseException("Missing field name in graph vertices definition", token.name()); + } + VertexRequest vr = currentHop.addVertexRequest(field); + if (includes != null) { + for (TermBoost tb : includes.values()) { + vr.addInclude(tb.getTerm(), tb.getBoost()); + } + } + if (excludes != null) { + for (String term : excludes) { + vr.addExclude(term); + } + } + vr.size(size); + vr.minDocCount(minDocCount); + vr.shardMinDocCount(shardMinDocCount); + + } + + } + + } + + + private void parseControls(XContentParser parser, QueryParseContext context, GraphExploreRequest graphRequest) throws IOException { + XContentParser.Token token; + + String fieldName = null; + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + } else if (token == XContentParser.Token.VALUE_NUMBER) { + if (context.parseFieldMatcher().match(fieldName, SAMPLE_SIZE_FIELD)) { + graphRequest.sampleSize(parser.intValue()); + } else if (context.parseFieldMatcher().match(fieldName, TIMEOUT_FIELD)) { + graphRequest.timeout(TimeValue.timeValueMillis(parser.longValue())); + } else { + throw new ElasticsearchParseException("Unknown numeric property: [" + fieldName + "]"); + } + } else if (token == XContentParser.Token.VALUE_BOOLEAN) { + if (context.parseFieldMatcher().match(fieldName, SIGNIFICANCE_FIELD)) { + graphRequest.useSignificance(parser.booleanValue()); + } else if (context.parseFieldMatcher().match(fieldName, RETURN_DETAILED_INFO)) { + graphRequest.returnDetailedInfo(parser.booleanValue()); + } else{ + throw new ElasticsearchParseException("Unknown boolean property: [" + fieldName + "]"); + } + } else if (token == XContentParser.Token.VALUE_STRING) { + if (context.parseFieldMatcher().match(fieldName, TIMEOUT_FIELD)) { + graphRequest.timeout(TimeValue.parseTimeValue(parser.text(), null, "timeout")); + } else { + throw new ElasticsearchParseException("Unknown numeric property: [" + fieldName + "]"); + } + } else if (token == XContentParser.Token.START_OBJECT) { + if (context.parseFieldMatcher().match(fieldName, SAMPLE_DIVERSITY_FIELD)) { + while ((token = parser.nextToken()) != XContentParser.Token.END_OBJECT) { + if (token == XContentParser.Token.FIELD_NAME) { + fieldName = parser.currentName(); + token = parser.nextToken(); + } + if (context.parseFieldMatcher().match(fieldName, FIELD_NAME_FIELD)) { + graphRequest.sampleDiversityField(parser.text()); + } else if (context.parseFieldMatcher().match(fieldName, MAX_DOCS_PER_VALUE_FIELD)) { + graphRequest.maxDocsPerDiversityValue(parser.intValue()); + } else { + throw new ElasticsearchParseException("Unknown property: [" + fieldName + "]"); + } + } + } else { + throw new ElasticsearchParseException("Unknown object property: [" + fieldName + "]"); + } + } else { + throw new ElasticsearchParseException("Unknown object property: [" + fieldName + "]"); + } + + } + } +} diff --git a/elasticsearch/x-pack/graph/src/test/java/org/elasticsearch/graph/license/LicenseTests.java b/elasticsearch/x-pack/graph/src/test/java/org/elasticsearch/graph/license/LicenseTests.java new file mode 100644 index 00000000000..9d3efbbfba9 --- /dev/null +++ b/elasticsearch/x-pack/graph/src/test/java/org/elasticsearch/graph/license/LicenseTests.java @@ -0,0 +1,150 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.graph.license; + +import org.elasticsearch.common.component.AbstractComponent; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.license.core.License; +import org.elasticsearch.license.plugin.core.LicenseState; +import org.elasticsearch.license.plugin.core.Licensee; +import org.elasticsearch.license.plugin.core.LicenseeRegistry; +import org.elasticsearch.test.ESTestCase; + +import java.util.ArrayList; +import java.util.List; + +import static org.hamcrest.Matchers.is; + +public class LicenseTests extends ESTestCase { + + private SimpleLicenseeRegistry licenseeRegistry = new SimpleLicenseeRegistry(); + + public void testPlatinumTrialLicenseCanDoEverything() throws Exception { + licenseeRegistry.setOperationMode( + randomFrom(License.OperationMode.PLATINUM, License.OperationMode.TRIAL)); + GraphLicensee graphLicensee = new GraphLicensee(Settings.EMPTY, licenseeRegistry); + licenseeRegistry.register(graphLicensee); + + assertLicensePlatinumTrialBehaviour(graphLicensee); + } + + public void testBasicLicenseIsDisabled() throws Exception { + licenseeRegistry.setOperationMode(License.OperationMode.BASIC); + GraphLicensee graphLicensee = new GraphLicensee(Settings.EMPTY, licenseeRegistry); + licenseeRegistry.register(graphLicensee); + + assertLicenseBasicOrGoldOrNoneOrExpiredBehaviour(graphLicensee); + } + + public void testNoLicenseDoesNotWork() { + licenseeRegistry.setOperationMode(License.OperationMode.BASIC); + GraphLicensee graphLicensee = new GraphLicensee(Settings.EMPTY, licenseeRegistry); + licenseeRegistry.register(graphLicensee); + licenseeRegistry.disable(); + + assertLicenseBasicOrGoldOrNoneOrExpiredBehaviour(graphLicensee); + } + + public void testExpiredPlatinumTrialLicenseIsRestricted() throws Exception { + licenseeRegistry.setOperationMode( + randomFrom(License.OperationMode.PLATINUM, License.OperationMode.TRIAL)); + GraphLicensee graphLicensee = new GraphLicensee(Settings.EMPTY, licenseeRegistry); + licenseeRegistry.register(graphLicensee); + licenseeRegistry.disable(); + + assertLicenseBasicOrGoldOrNoneOrExpiredBehaviour(graphLicensee); + } + + public void testUpgradingFromBasicLicenseWorks() { + licenseeRegistry.setOperationMode(License.OperationMode.BASIC); + GraphLicensee graphLicensee = new GraphLicensee(Settings.EMPTY, licenseeRegistry); + licenseeRegistry.register(graphLicensee); + + assertLicenseBasicOrGoldOrNoneOrExpiredBehaviour(graphLicensee); + + licenseeRegistry.setOperationMode( + randomFrom(License.OperationMode.PLATINUM, License.OperationMode.TRIAL)); + assertLicensePlatinumTrialBehaviour(graphLicensee); + } + + public void testDowngradingToBasicLicenseWorks() { + licenseeRegistry.setOperationMode( + randomFrom(License.OperationMode.PLATINUM, License.OperationMode.TRIAL)); + GraphLicensee graphLicensee = new GraphLicensee(Settings.EMPTY, licenseeRegistry); + licenseeRegistry.register(graphLicensee); + + assertLicensePlatinumTrialBehaviour(graphLicensee); + + licenseeRegistry.setOperationMode(License.OperationMode.BASIC); + assertLicenseBasicOrGoldOrNoneOrExpiredBehaviour(graphLicensee); + } + + public void testDowngradingToGoldLicenseWorks() { + licenseeRegistry.setOperationMode( + randomFrom(License.OperationMode.PLATINUM, License.OperationMode.TRIAL)); + GraphLicensee graphLicensee = new GraphLicensee(Settings.EMPTY, licenseeRegistry); + licenseeRegistry.register(graphLicensee); + + assertLicensePlatinumTrialBehaviour(graphLicensee); + + licenseeRegistry.setOperationMode(License.OperationMode.GOLD); + assertLicenseBasicOrGoldOrNoneOrExpiredBehaviour(graphLicensee); + } + + public void testUpgradingExpiredLicenseWorks() { + licenseeRegistry.setOperationMode( + randomFrom(License.OperationMode.PLATINUM, License.OperationMode.TRIAL)); + GraphLicensee graphLicensee = new GraphLicensee(Settings.EMPTY, licenseeRegistry); + licenseeRegistry.register(graphLicensee); + licenseeRegistry.disable(); + + assertLicenseBasicOrGoldOrNoneOrExpiredBehaviour(graphLicensee); + + licenseeRegistry.setOperationMode( + randomFrom(License.OperationMode.PLATINUM, License.OperationMode.TRIAL)); + assertLicensePlatinumTrialBehaviour(graphLicensee); + } + + private void assertLicensePlatinumTrialBehaviour(GraphLicensee graphLicensee) { + assertThat("Expected graph exploration to be allowed", graphLicensee.isGraphExploreAllowed(), is(true)); + } + + private void assertLicenseBasicOrGoldOrNoneOrExpiredBehaviour(GraphLicensee graphLicensee) { + assertThat("Expected graph exploration not to be allowed", graphLicensee.isGraphExploreAllowed(), is(false)); + } + + public static class SimpleLicenseeRegistry extends AbstractComponent implements LicenseeRegistry { + private final List licensees = new ArrayList<>(); + private License.OperationMode operationMode; + + public SimpleLicenseeRegistry() { + super(Settings.EMPTY); + } + + @Override + public void register(Licensee licensee) { + licensees.add(licensee); + enable(); + } + + public void enable() { + for (Licensee licensee : licensees) { + licensee.onChange(new Licensee.Status(operationMode, randomBoolean() ? LicenseState.ENABLED : LicenseState.GRACE_PERIOD)); + } + } + + public void disable() { + for (Licensee licensee : licensees) { + licensee.onChange(new Licensee.Status(operationMode, LicenseState.DISABLED)); + } + } + + public void setOperationMode(License.OperationMode operationMode) { + this.operationMode = operationMode; + enable(); + } + } +} diff --git a/elasticsearch/x-pack/graph/src/test/java/org/elasticsearch/graph/test/GraphTests.java b/elasticsearch/x-pack/graph/src/test/java/org/elasticsearch/graph/test/GraphTests.java new file mode 100644 index 00000000000..878250c8584 --- /dev/null +++ b/elasticsearch/x-pack/graph/src/test/java/org/elasticsearch/graph/test/GraphTests.java @@ -0,0 +1,385 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.graph.test; + +import org.apache.lucene.search.BooleanQuery; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.settings.Settings.Builder; +import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.env.NodeEnvironment; +import org.elasticsearch.graph.action.GraphExploreAction; +import org.elasticsearch.graph.action.GraphExploreRequest; +import org.elasticsearch.graph.action.GraphExploreRequestBuilder; +import org.elasticsearch.graph.action.GraphExploreResponse; +import org.elasticsearch.graph.action.Hop; +import org.elasticsearch.graph.action.Vertex; +import org.elasticsearch.graph.action.VertexRequest; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.index.query.ScriptQueryBuilder; +import org.elasticsearch.marvel.Marvel; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.script.AbstractSearchScript; +import org.elasticsearch.script.ExecutableScript; +import org.elasticsearch.script.NativeScriptFactory; +import org.elasticsearch.script.Script; +import org.elasticsearch.script.ScriptModule; +import org.elasticsearch.script.ScriptService.ScriptType; +import org.elasticsearch.shield.Shield; +import org.elasticsearch.test.ESSingleNodeTestCase; +import org.elasticsearch.watcher.Watcher; +import org.elasticsearch.xpack.XPackPlugin; + +import java.io.IOException; +import java.util.Collection; +import java.util.Map; + +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_REPLICAS; +import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; +import static org.elasticsearch.index.query.QueryBuilders.matchAllQuery; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; +import static org.hamcrest.Matchers.greaterThan; + + +public class GraphTests extends ESSingleNodeTestCase { + + static class DocTemplate { + int numDocs; + String[] people; + String description; + String decade; + + public DocTemplate(int numDocs, String decade, String description, String... people) { + super(); + this.decade = decade; + this.numDocs = numDocs; + this.description = description; + this.people = people; + } + } + + + static final DocTemplate[] socialNetTemplate = { + new DocTemplate(10, "60s", "beatles", "john", "paul", "george", "ringo"), + new DocTemplate(2, "60s", "collaboration", "ravi", "george"), + new DocTemplate(3, "80s", "travelling wilburys", "roy", "george", "jeff"), + new DocTemplate(5, "80s", "travelling wilburys", "roy", "jeff", "bob"), + new DocTemplate(1, "70s", "collaboration", "roy", "elvis"), + new DocTemplate(10, "90s", "nirvana", "dave", "kurt"), + new DocTemplate(2, "00s", "collaboration", "dave", "paul"), + new DocTemplate(2, "80s", "collaboration", "stevie", "paul"), + new DocTemplate(2, "70s", "collaboration", "john", "yoko"), + new DocTemplate(100, "70s", "fillerDoc", "other", "irrelevant", "duplicated", "spammy", "background") + }; + + + @Override + public void setUp() throws Exception { + super.setUp(); + createIndex("test", Settings.builder().put(SETTING_NUMBER_OF_SHARDS, 2).put(SETTING_NUMBER_OF_REPLICAS, 0).build()); + createIndex("idx_unmapped"); + + ensureGreen(); + + int numDocs = 0; + for (DocTemplate dt : socialNetTemplate) { + for (int i = 0; i < dt.numDocs; i++) { + client().prepareIndex("test", "type").setSource("decade", dt.decade, "people", dt.people, "description", dt.description) + .get(); + numDocs++; + } + } + client().admin().indices().prepareRefresh("test").get(); + assertHitCount(client().prepareSearch().setQuery(matchAllQuery()).get(), numDocs); + } + + @Override + protected Collection> getPlugins() { + return pluginList(ScriptedTimeoutPlugin.class, XPackPlugin.class); + } + + public void testSignificanceQueryCrawl() { + GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE).setIndices("test"); + Hop hop1 = grb.createNextHop(QueryBuilders.termQuery("description", "beatles")); + hop1.addVertexRequest("people").size(10).minDocCount(1); // members of beatles + grb.createNextHop(null).addVertexRequest("people").size(100).minDocCount(1); // friends of members of beatles + + GraphExploreResponse response = grb.get(); + + checkVertexDepth(response, 0, "john", "paul", "george", "ringo"); + checkVertexDepth(response, 1, "stevie", "yoko", "roy"); + checkVertexIsMoreImportant(response, "John's only collaboration is more relevant than one of Paul's many", "yoko", "stevie"); + checkVertexIsMoreImportant(response, "John's only collaboration is more relevant than George's with profligate Roy", "yoko", "roy"); + assertNull("Elvis is a 3rd tier connection so should not be returned here", response.getVertex(Vertex.createId("people","elvis"))); + } + + + @Override + public Settings nodeSettings() { + // Disable Shield otherwise authentication failures happen creating indices. + Builder newSettings = Settings.builder(); + newSettings.put(XPackPlugin.featureEnabledSetting(Shield.NAME), false); + newSettings.put(XPackPlugin.featureEnabledSetting(Marvel.NAME), false); + newSettings.put(XPackPlugin.featureEnabledSetting(Watcher.NAME), false); + return newSettings.build(); + } + + public void testTargetedQueryCrawl() { + // Tests use of a client-provided query to steer exploration + GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE).setIndices("test"); + Hop hop1 = grb.createNextHop(QueryBuilders.termQuery("description", "beatles")); + hop1.addVertexRequest("people").size(10).minDocCount(1); // members of beatles + //70s friends of beatles + grb.createNextHop(QueryBuilders.termQuery("decade", "70s")).addVertexRequest("people").size(100).minDocCount(1); + + GraphExploreResponse response = grb.get(); + + checkVertexDepth(response, 0, "john", "paul", "george", "ringo"); + checkVertexDepth(response, 1, "yoko"); + assertNull("Roy collaborated with George in the 80s not the 70s", response.getVertex(Vertex.createId("people","roy"))); + assertNull("Stevie collaborated with Paul in the 80s not the 70s", response.getVertex(Vertex.createId("people","stevie"))); + + } + + + + public void testLargeNumberTermsStartCrawl() { + GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE).setIndices("test"); + Hop hop1 = grb.createNextHop(null); + VertexRequest peopleNames = hop1.addVertexRequest("people").minDocCount(1); + peopleNames.addInclude("john", 1); + + for (int i = 0; i < BooleanQuery.getMaxClauseCount()+1; i++) { + peopleNames.addInclude("unknown"+i, 1); + } + + grb.createNextHop(null).addVertexRequest("people").size(100).minDocCount(1); // friends of members of beatles + + GraphExploreResponse response = grb.get(); + + checkVertexDepth(response, 0, "john"); + checkVertexDepth(response, 1, "yoko"); + } + + public void testTargetedQueryCrawlDepth2() { + GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE).setIndices("test"); + Hop hop1 = grb.createNextHop(QueryBuilders.termQuery("description", "beatles")); + hop1.addVertexRequest("people").size(10).minDocCount(1); // members of beatles + //00s friends of beatles + grb.createNextHop(QueryBuilders.termQuery("decade", "00s")).addVertexRequest("people").size(100).minDocCount(1); + //90s friends of friends of beatles + grb.createNextHop(QueryBuilders.termQuery("decade", "90s")).addVertexRequest("people").size(100).minDocCount(1); + + GraphExploreResponse response = grb.get(); + + + checkVertexDepth(response, 0, "john", "paul", "george", "ringo"); + checkVertexDepth(response, 1, "dave"); + checkVertexDepth(response, 2, "kurt"); + + } + + public void testPopularityQueryCrawl() { + GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE).setIndices("test"); + // Turning off the significance feature means we reward popularity + grb.useSignificance(false); + Hop hop1 = grb.createNextHop(QueryBuilders.termQuery("description", "beatles")); + hop1.addVertexRequest("people").size(10).minDocCount(1); // members of beatles + grb.createNextHop(null).addVertexRequest("people").size(100).minDocCount(1); // friends of members of beatles + + GraphExploreResponse response = grb.get(); + + checkVertexDepth(response, 0, "john", "paul", "george", "ringo"); + checkVertexDepth(response, 1, "stevie", "yoko", "roy"); + checkVertexIsMoreImportant(response, "Yoko has more collaborations than Stevie", "yoko", "stevie"); + checkVertexIsMoreImportant(response, "Roy has more collaborations than Stevie", "roy", "stevie"); + assertNull("Elvis is a 3rd tier connection so should not be returned here", response.getVertex(Vertex.createId("people","elvis"))); + } + + public void testTimedoutQueryCrawl() { + GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE).setIndices("test"); + grb.setTimeout(TimeValue.timeValueMillis(400)); + Hop hop1 = grb.createNextHop(QueryBuilders.termQuery("description", "beatles")); + hop1.addVertexRequest("people").size(10).minDocCount(1); // members of beatles + //00s friends of beatles + grb.createNextHop(QueryBuilders.termQuery("decade", "00s")).addVertexRequest("people").size(100).minDocCount(1); + // A query that should cause a timeout + ScriptQueryBuilder timeoutQuery = QueryBuilders.scriptQuery(new Script(NativeTestScriptedTimeout.TEST_NATIVE_SCRIPT_TIMEOUT, + ScriptType.INLINE, "native", null)); + grb.createNextHop(timeoutQuery).addVertexRequest("people").size(100).minDocCount(1); + + GraphExploreResponse response = grb.get(); + assertTrue(response.isTimedOut()); + + checkVertexDepth(response, 0, "john", "paul", "george", "ringo"); + + // Most of the test runs we reach dave in the allotted time before we hit our + // intended delay but sometimes this doesn't happen so I commented this line out. + + // checkVertexDepth(response, 1, "dave"); + + // This is the point where we should certainly have run out of time due + // to the test query plugin with a deliberate pause + assertNull("Should have timed out trying to crawl out to kurt", response.getVertex(Vertex.createId("people","kurt"))); + + } + + public void testNonDiversifiedCrawl() { + GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE).setIndices("test"); + + Hop hop1 = grb.createNextHop(QueryBuilders.termsQuery("people", "dave", "other")); + hop1.addVertexRequest("people").size(10).minDocCount(1); + + GraphExploreResponse response = grb.get(); + + checkVertexDepth(response, 0, "dave", "kurt", "other", "spammy"); + checkVertexIsMoreImportant(response, "Due to duplication and no diversification spammy content beats signal", "spammy", "kurt"); + } + + public void testDiversifiedCrawl() { + GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE).setIndices("test"); + grb.sampleDiversityField("description").maxDocsPerDiversityValue(1); + + Hop hop1 = grb.createNextHop(QueryBuilders.termsQuery("people", "dave", "other")); + hop1.addVertexRequest("people").size(10).minDocCount(1); + + GraphExploreResponse response = grb.get(); + + checkVertexDepth(response, 0, "dave", "kurt"); + assertNull("Duplicate spam should be removed from the results", response.getVertex(Vertex.createId("people","spammy"))); + } + + public void testInvalidDiversifiedCrawl() { + GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE).setIndices("test"); + grb.sampleDiversityField("description").maxDocsPerDiversityValue(1); + + Hop hop1 = grb.createNextHop(QueryBuilders.termsQuery("people", "roy", "other")); + hop1.addVertexRequest("people").size(10).minDocCount(1); + + Throwable expectedError = null; + try { + grb.get(); + } catch (Throwable rte) { + expectedError = rte; + + } + assertNotNull(expectedError); + String message = expectedError.toString(); + assertTrue(message.contains("Sample diversifying key must be a single valued-field")); + } + + public void testMappedAndUnmappedQueryCrawl() { + GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE) + .setIndices("test", "idx_unmapped"); + Hop hop1 = grb.createNextHop(QueryBuilders.termQuery("description", "beatles")); + hop1.addVertexRequest("people").size(10).minDocCount(1); // members of beatles + grb.createNextHop(null).addVertexRequest("people").size(100).minDocCount(1); // friends of members of beatles + + GraphExploreResponse response = grb.get(); + + checkVertexDepth(response, 0, "john", "paul", "george", "ringo"); + checkVertexDepth(response, 1, "stevie", "yoko", "roy"); + checkVertexIsMoreImportant(response, "John's only collaboration is more relevant than one of Paul's many", "yoko", "stevie"); + checkVertexIsMoreImportant(response, "John's only collaboration is more relevant than George's with profligate Roy", "yoko", "roy"); + assertNull("Elvis is a 3rd tier connection so should not be returned here", response.getVertex(Vertex.createId("people","elvis"))); + } + + public void testUnmappedQueryCrawl() { + GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE).setIndices("idx_unmapped"); + Hop hop1 = grb.createNextHop(QueryBuilders.termQuery("description", "beatles")); + hop1.addVertexRequest("people").size(10).minDocCount(1); + + GraphExploreResponse response = grb.get(); + assertEquals(0, response.getConnections().size()); + assertEquals(0, response.getVertices().size()); + + } + + public void testRequestValidation() { + GraphExploreRequestBuilder grb = new GraphExploreRequestBuilder(client(), GraphExploreAction.INSTANCE).setIndices("test"); + + try { + grb.get(); + fail("Validation failure expected"); + } catch (ActionRequestValidationException rte) { + assertTrue(rte.getMessage().contains(GraphExploreRequest.NO_HOPS_ERROR_MESSAGE)); + } + + Hop hop = grb.createNextHop(null); + + try { + grb.get(); + fail("Validation failure expected"); + } catch (ActionRequestValidationException rte) { + assertTrue(rte.getMessage().contains(GraphExploreRequest.NO_VERTICES_ERROR_MESSAGE)); + } + + } + + + private static void checkVertexDepth(GraphExploreResponse response, int expectedDepth, String... ids) { + for (String id : ids) { + Vertex vertex = response.getVertex(Vertex.createId("people", id)); + assertNotNull("Expected to find " + id, vertex); + assertEquals(id + " found at wrong hop depth", expectedDepth, vertex.getHopDepth()); + } + } + + private static void checkVertexIsMoreImportant(GraphExploreResponse response, String why, String strongerId, String weakerId) { + // *Very* rarely I think the doc delete randomization and background merges conspire to + // make this test fail. Scores vary slightly due to deletes I suspect. + Vertex strongVertex = response.getVertex(Vertex.createId("people", strongerId)); + assertNotNull(strongVertex); + Vertex weakVertex = response.getVertex(Vertex.createId("people",weakerId)); + assertNotNull(weakVertex); + assertThat(why, strongVertex.getWeight(), greaterThan(weakVertex.getWeight())); + } + + public static class ScriptedTimeoutPlugin extends Plugin { + @Override + public String name() { + return "test-scripted-graph-timeout"; + } + + @Override + public String description() { + return "Test for scripted timeouts on graph searches"; + } + + public void onModule(ScriptModule module) { + module.registerScript(NativeTestScriptedTimeout.TEST_NATIVE_SCRIPT_TIMEOUT, NativeTestScriptedTimeout.Factory.class); + } + } + + public static class NativeTestScriptedTimeout extends AbstractSearchScript { + + public static final String TEST_NATIVE_SCRIPT_TIMEOUT = "native_test_graph_timeout_script"; + + public static class Factory implements NativeScriptFactory { + + @Override + public ExecutableScript newScript(Map params) { + return new NativeTestScriptedTimeout(); + } + + @Override + public boolean needsScores() { + return false; + } + } + + @Override + public Object run() { + try { + Thread.sleep(750); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + return true; + } + } + +} \ No newline at end of file diff --git a/elasticsearch/x-pack/graph/src/test/resources/rest-api-spec/api/graph.explore.json b/elasticsearch/x-pack/graph/src/test/resources/rest-api-spec/api/graph.explore.json new file mode 100644 index 00000000000..02ea75fcadd --- /dev/null +++ b/elasticsearch/x-pack/graph/src/test/resources/rest-api-spec/api/graph.explore.json @@ -0,0 +1,33 @@ +{ + "graph.explore": { + "documentation": "https://www.elastic.co/guide/en/graph/current/explore.html", + "methods": ["GET", "POST"], + "url": { + "path": "/{index}/_graph/explore", + "paths": ["/{index}/_graph/explore", "/{index}/{type}/_graph/explore"], + "parts" : { + "index": { + "type" : "list", + "description" : "A comma-separated list of index names to search; use `_all` or empty string to perform the operation on all indices" + }, + "type": { + "type" : "list", + "description" : "A comma-separated list of document types to search; leave empty to perform the operation on all types" + } + }, + "params": { + "routing": { + "type" : "string", + "description" : "Specific routing value" + }, + "timeout": { + "type" : "time", + "description" : "Explicit operation timeout" + } + } + }, + "body": { + "description" : "Graph Query DSL" + } + } +} \ No newline at end of file diff --git a/elasticsearch/x-pack/graph/src/test/resources/rest-api-spec/test/graph/10_basic.yaml b/elasticsearch/x-pack/graph/src/test/resources/rest-api-spec/test/graph/10_basic.yaml new file mode 100644 index 00000000000..a41e4b2feaa --- /dev/null +++ b/elasticsearch/x-pack/graph/src/test/resources/rest-api-spec/test/graph/10_basic.yaml @@ -0,0 +1,43 @@ +--- +"Test basic graph explore": + - do: + indices.create: + index: test_1 + body: + settings: + index: + number_of_replicas: 0 + mappings: + test: + properties: + keys: + type : "integer" + + - do: + index: + index: test_1 + type: test + id: 1 + body: { keys: [1,2,3] } + + - do: + index: + index: test_1 + type: test + id: 2 + body: { keys: [4,5,6] } + + - do: + indices.refresh: {} + + - do: + cluster.health: + wait_for_status: green + + - do: + graph.explore: + index: test_1 + type: test + body: {"query": {"match": {"keys": 1}},"controls":{"use_significance":false},"vertices":[{"field": "keys","min_doc_count": 1}]} + - length: {failures: 0} + - length: {vertices: 3} diff --git a/elasticsearch/x-pack/license-plugin/src/test/java/org/elasticsearch/license/plugin/AbstractLicensesIntegrationTestCase.java b/elasticsearch/x-pack/license-plugin/src/test/java/org/elasticsearch/license/plugin/AbstractLicensesIntegrationTestCase.java index c95ac1b9780..7307dc898c2 100644 --- a/elasticsearch/x-pack/license-plugin/src/test/java/org/elasticsearch/license/plugin/AbstractLicensesIntegrationTestCase.java +++ b/elasticsearch/x-pack/license-plugin/src/test/java/org/elasticsearch/license/plugin/AbstractLicensesIntegrationTestCase.java @@ -12,6 +12,7 @@ import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.TimeValue; +import org.elasticsearch.graph.Graph; import org.elasticsearch.license.core.License; import org.elasticsearch.license.plugin.action.put.PutLicenseAction; import org.elasticsearch.license.plugin.action.put.PutLicenseRequestBuilder; @@ -50,6 +51,7 @@ public abstract class AbstractLicensesIntegrationTestCase extends ESIntegTestCas .put(XPackPlugin.featureEnabledSetting(Shield.NAME), false) .put(XPackPlugin.featureEnabledSetting(Marvel.NAME), false) .put(XPackPlugin.featureEnabledSetting(Watcher.NAME), false) + .put(XPackPlugin.featureEnabledSetting(Graph.NAME), false) .build(); } diff --git a/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/Shield.java b/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/Shield.java index 9cfa2f58159..2461d285bbc 100644 --- a/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/Shield.java +++ b/elasticsearch/x-pack/shield/src/main/java/org/elasticsearch/shield/Shield.java @@ -45,6 +45,7 @@ import org.elasticsearch.shield.authz.AuthorizationModule; import org.elasticsearch.shield.authz.accesscontrol.OptOutQueryCache; import org.elasticsearch.shield.authz.accesscontrol.ShieldIndexSearcherWrapper; import org.elasticsearch.shield.authz.privilege.ClusterPrivilege; +import org.elasticsearch.shield.authz.privilege.IndexPrivilege; import org.elasticsearch.shield.authz.store.FileRolesStore; import org.elasticsearch.shield.crypto.CryptoModule; import org.elasticsearch.shield.crypto.InternalCryptoService; @@ -289,6 +290,18 @@ public class Shield { // multiple nodes will try to add the same privileges multiple times. } } + + public static void registerIndexPrivilege(String name, String... patterns) { + try { + IndexPrivilege.addCustom(name, patterns); + } catch (Exception se) { + logger.warn("could not register index privilege [{}]", name); + + // we need to prevent bubbling the shield exception here for the tests. In the tests + // we create multiple nodes in the same jvm and since the custom cluster is a static binding + // multiple nodes will try to add the same privileges multiple times. + } + } private void addUserSettings(Settings.Builder settingsBuilder) { String authHeaderSettingName = ThreadContext.PREFIX + "." + UsernamePasswordToken.BASIC_AUTH_HEADER; diff --git a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/transport/KnownActionsTests.java b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/transport/KnownActionsTests.java index 043122d658f..f685cf46ed6 100644 --- a/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/transport/KnownActionsTests.java +++ b/elasticsearch/x-pack/shield/src/test/java/org/elasticsearch/transport/KnownActionsTests.java @@ -10,6 +10,7 @@ import org.elasticsearch.action.Action; import org.elasticsearch.common.io.PathUtils; import org.elasticsearch.common.io.Streams; import org.elasticsearch.license.plugin.Licensing; +import org.elasticsearch.graph.Graph; import org.elasticsearch.shield.action.ShieldActionModule; import org.elasticsearch.test.ESIntegTestCase.ClusterScope; import org.elasticsearch.test.ShieldIntegTestCase; @@ -112,6 +113,10 @@ public class KnownActionsTests extends ShieldIntegTestCase { // also loading all actions from the licensing plugin loadActions(collectSubClasses(Action.class, Licensing.class), actions); + // also loading all actions from the graph plugin + loadActions(collectSubClasses(Action.class, Graph.class), actions); + + return unmodifiableSet(actions); } diff --git a/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/actions b/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/actions index 2c404c3a436..32bf4a63e4f 100644 --- a/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/actions +++ b/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/actions @@ -54,6 +54,7 @@ indices:monitor/upgrade indices:data/read/explain indices:data/read/field_stats indices:data/read/get +indices:data/read/graph/explore indices:data/read/mget indices:data/read/mpercolate indices:data/read/msearch diff --git a/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/handlers b/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/handlers index 3282cdb2d8e..cd59ee501b7 100644 --- a/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/handlers +++ b/elasticsearch/x-pack/shield/src/test/resources/org/elasticsearch/transport/handlers @@ -36,6 +36,7 @@ indices:admin/validate/query[s] indices:data/read/explain[s] indices:data/read/field_stats[s] indices:data/read/get[s] +indices:data/read/graph/explore indices:data/read/mget[shard][s] indices:data/read/mpercolate[shard][s] indices:data/read/mtv[shard][s] diff --git a/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/XPackPlugin.java b/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/XPackPlugin.java index 2df5e099af3..1d19871c46f 100644 --- a/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/XPackPlugin.java +++ b/elasticsearch/x-pack/src/main/java/org/elasticsearch/xpack/XPackPlugin.java @@ -15,6 +15,7 @@ import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.SettingsModule; import org.elasticsearch.env.Environment; +import org.elasticsearch.graph.Graph; import org.elasticsearch.index.IndexModule; import org.elasticsearch.license.plugin.Licensing; import org.elasticsearch.marvel.Marvel; @@ -71,6 +72,7 @@ public class XPackPlugin extends Plugin { protected Shield shield; protected Marvel marvel; protected Watcher watcher; + protected Graph graph; public XPackPlugin(Settings settings) { this.settings = settings; @@ -78,6 +80,7 @@ public class XPackPlugin extends Plugin { this.shield = new Shield(settings); this.marvel = new Marvel(settings); this.watcher = new Watcher(settings); + this.graph = new Graph(settings); } @Override public String name() { @@ -96,6 +99,7 @@ public class XPackPlugin extends Plugin { modules.addAll(shield.nodeModules()); modules.addAll(watcher.nodeModules()); modules.addAll(marvel.nodeModules()); + modules.addAll(graph.nodeModules()); return modules; } @@ -110,6 +114,7 @@ public class XPackPlugin extends Plugin { services.addAll(shield.nodeServices()); services.addAll(watcher.nodeServices()); services.addAll(marvel.nodeServices()); + services.addAll(graph.nodeServices()); return services; } @@ -118,6 +123,7 @@ public class XPackPlugin extends Plugin { Settings.Builder builder = Settings.builder(); builder.put(shield.additionalSettings()); builder.put(watcher.additionalSettings()); + builder.put(graph.additionalSettings()); return builder.build(); } @@ -133,6 +139,7 @@ public class XPackPlugin extends Plugin { shield.onModule(module); marvel.onModule(module); watcher.onModule(module); + graph.onModule(module); licensing.onModule(module); } @@ -140,16 +147,19 @@ public class XPackPlugin extends Plugin { licensing.onModule(module); shield.onModule(module); watcher.onModule(module); + graph.onModule(module); } public void onModule(ActionModule module) { licensing.onModule(module); shield.onModule(module); watcher.onModule(module); + graph.onModule(module); } public void onIndexModule(IndexModule module) { shield.onIndexModule(module); + graph.onIndexModule(module); } public void onModule(LazyInitializationModule module) {