From 4bebc307c312ac56ab42b2f20c340525b54eeca1 Mon Sep 17 00:00:00 2001 From: Igor Motov Date: Mon, 11 Dec 2017 11:36:02 -0500 Subject: [PATCH] SQL: Add ability to close cursors (elastic/x-pack-elasticsearch#3249) This commits adds a new end point for closing in-flight cursors, it also ensures that all cursors are properly closed by adding after test checks that ensures that we don't leave any search context open. relates elastic/x-pack-elasticsearch#2878 Original commit: elastic/x-pack-elasticsearch@1052ea28dc162b635a93849a63e934ff974f1b3c --- docs/en/sql/endpoints/sql-endpoints.asciidoc | 1 + docs/en/sql/endpoints/sql-rest.asciidoc | 28 +- .../security/authz/AuthorizationService.java | 2 + .../xpack/sql/SqlClearCursorActionIT.java | 92 ++++++ .../api/xpack.sql.clear_cursor.json | 15 + .../{xpack.sql.json => xpack.sql.query.json} | 4 +- .../resources/rest-api-spec/test/sql/sql.yml | 83 +++++- .../qa/sql/cli/CliIntegrationTestCase.java | 5 +- .../xpack/qa/sql/jdbc/FetchSizeTestCase.java | 23 ++ .../qa/sql/jdbc/JdbcIntegrationTestCase.java | 10 + .../xpack/qa/sql/rest/RestSqlTestCase.java | 43 ++- .../xpack/sql/cli/net/protocol/Proto.java | 6 +- .../cli/net/protocol/QueryCloseRequest.java | 26 ++ .../cli/net/protocol/QueryCloseResponse.java | 34 +++ .../net/protocol/QueryCloseRequestTests.java | 28 ++ .../net/protocol/QueryCloseResponseTests.java | 28 ++ .../xpack/sql/cli/CliHttpClient.java | 9 + .../cli/command/ServerQueryCliCommand.java | 52 ++-- .../command/ServerQueryCliCommandTests.java | 18 ++ .../xpack/sql/jdbc/net/protocol/Proto.java | 4 +- .../jdbc/net/protocol/QueryCloseRequest.java | 32 +++ .../jdbc/net/protocol/QueryCloseResponse.java | 35 +++ .../net/protocol/QueryCloseRequestTests.java | 29 ++ .../net/protocol/QueryCloseResponseTests.java | 26 ++ .../sql/jdbc/jdbc/JdbcDatabaseMetaData.java | 5 + .../xpack/sql/jdbc/jdbc/JdbcResultSet.java | 3 +- .../xpack/sql/jdbc/jdbc/JdbcStatement.java | 6 +- .../xpack/sql/jdbc/net/client/Cursor.java | 2 + .../sql/jdbc/net/client/DefaultCursor.java | 7 + .../xpack/sql/jdbc/net/client/HttpClient.java | 2 +- .../sql/jdbc/net/client/JdbcHttpClient.java | 9 + .../xpack/sql/execution/PlanExecutor.java | 4 + .../search/AbstractSearchHitRowSet.java | 29 +- .../sql/execution/search/ScrollCursor.java | 28 +- .../xpack/sql/execution/search/Scroller.java | 45 +-- .../xpack/sql/plugin/CliFormatterCursor.java | 5 + .../xpack/sql/plugin/JdbcCursor.java | 5 + .../xpack/sql/plugin/RestSqlCliAction.java | 11 + .../xpack/sql/plugin/RestSqlJdbcAction.java | 11 + .../sql/plugin/SqlClearCursorAction.java | 261 ++++++++++++++++++ .../xpack/sql/plugin/SqlPlugin.java | 6 +- .../sql/plugin/sql/action/SqlRequest.java | 1 - .../plugin/sql/action/SqlRequestBuilder.java | 13 +- .../xpack/sql/session/Cursor.java | 5 + .../xpack/sql/session/EmptyCursor.java | 6 + .../sql/execution/search/CursorTests.java | 52 ++++ .../plugin/SqlClearCursorRequestTests.java | 38 +++ .../plugin/SqlClearCursorResponseTests.java | 27 ++ .../client/shared/JreHttpUrlConnection.java | 1 - .../shared/AbstractQueryCloseRequest.java | 49 ++++ .../shared/AbstractQueryCloseResponse.java | 57 ++++ 51 files changed, 1231 insertions(+), 90 deletions(-) create mode 100644 plugin/src/test/java/org/elasticsearch/xpack/sql/SqlClearCursorActionIT.java create mode 100644 plugin/src/test/resources/rest-api-spec/api/xpack.sql.clear_cursor.json rename plugin/src/test/resources/rest-api-spec/api/{xpack.sql.json => xpack.sql.query.json} (95%) create mode 100644 sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseRequest.java create mode 100644 sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseResponse.java create mode 100644 sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseRequestTests.java create mode 100644 sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseResponseTests.java create mode 100644 sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseRequest.java create mode 100644 sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseResponse.java create mode 100644 sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseRequestTests.java create mode 100644 sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseResponseTests.java create mode 100644 sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlClearCursorAction.java create mode 100644 sql/server/src/test/java/org/elasticsearch/xpack/sql/execution/search/CursorTests.java create mode 100644 sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlClearCursorRequestTests.java create mode 100644 sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlClearCursorResponseTests.java create mode 100644 sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryCloseRequest.java create mode 100644 sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryCloseResponse.java diff --git a/docs/en/sql/endpoints/sql-endpoints.asciidoc b/docs/en/sql/endpoints/sql-endpoints.asciidoc index d9bb59f3896..5ef2d7b39ac 100644 --- a/docs/en/sql/endpoints/sql-endpoints.asciidoc +++ b/docs/en/sql/endpoints/sql-endpoints.asciidoc @@ -1,4 +1,5 @@ include::sql-rest.asciidoc[] +include::sql-jdbc.asciidoc[] include::sql-translate.asciidoc[] include::sql-cli.asciidoc[] include::sql-jdbc.asciidoc[] diff --git a/docs/en/sql/endpoints/sql-rest.asciidoc b/docs/en/sql/endpoints/sql-rest.asciidoc index 48e29ab08ca..f6fcd00c41e 100644 --- a/docs/en/sql/endpoints/sql-rest.asciidoc +++ b/docs/en/sql/endpoints/sql-rest.asciidoc @@ -114,8 +114,32 @@ You've reached the last page when there is no `cursor` returned in the results. Like Elasticsearch's <>, SQL may keep state in Elasticsearch to support the cursor. Unlike scroll, receiving the last page is enough to guarantee that the -Elasticsearch state is cleared. For now, that is the only way to -clear the state. +Elasticsearch state is cleared. + +To clear the state earlier, you can use the clear cursor command: + +[source,js] +-------------------------------------------------- +POST /_xpack/sql/close +{ + "cursor": "sDXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAAEWYUpOYklQMHhRUEtld3RsNnFtYU1hQQ==:BAFmBGRhdGUBZgVsaWtlcwFzB21lc3NhZ2UBZgR1c2Vy9f///w8=" +} +-------------------------------------------------- +// CONSOLE +// TEST[continued] +// TEST[s/sDXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAAEWYUpOYklQMHhRUEtld3RsNnFtYU1hQQ==:BAFmBGRhdGUBZgVsaWtlcwFzB21lc3NhZ2UBZgR1c2Vy9f\/\/\/w8=/$body.cursor/] + +Which will like return the + +[source,js] +-------------------------------------------------- +{ + "succeeded" : true +} +-------------------------------------------------- +// TESTRESPONSE + + [[sql-rest-filtering]] diff --git a/plugin/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java b/plugin/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java index 17ffdeac89c..6c88dfc318a 100644 --- a/plugin/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java +++ b/plugin/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java @@ -64,6 +64,7 @@ import org.elasticsearch.xpack.security.user.SystemUser; import org.elasticsearch.xpack.security.user.User; import org.elasticsearch.xpack.security.user.XPackSecurityUser; import org.elasticsearch.xpack.security.user.XPackUser; +import org.elasticsearch.xpack.sql.plugin.SqlClearCursorAction; import org.elasticsearch.xpack.sql.plugin.SqlTranslateAction; import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction; @@ -501,6 +502,7 @@ public class AuthorizationService extends AbstractComponent { action.equals(SearchTransportService.QUERY_SCROLL_ACTION_NAME) || action.equals(SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME) || action.equals(ClearScrollAction.NAME) || + action.equals(SqlClearCursorAction.NAME) || action.equals(SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME); } diff --git a/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlClearCursorActionIT.java b/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlClearCursorActionIT.java new file mode 100644 index 00000000000..3964bb0cc4a --- /dev/null +++ b/plugin/src/test/java/org/elasticsearch/xpack/sql/SqlClearCursorActionIT.java @@ -0,0 +1,92 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql; + +import org.elasticsearch.action.bulk.BulkRequestBuilder; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.action.support.WriteRequest; +import org.elasticsearch.xpack.sql.plugin.SqlClearCursorAction; +import org.elasticsearch.xpack.sql.plugin.SqlClearCursorAction.Response; +import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction; +import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse; +import org.elasticsearch.xpack.sql.session.Cursor; + +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.hamcrest.Matchers.not; +import static org.hamcrest.Matchers.notNullValue; + +public class SqlClearCursorActionIT extends AbstractSqlIntegTestCase { + + public void testSqlClearCursorAction() throws Exception { + assertAcked(client().admin().indices().prepareCreate("test").get()); + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + int indexSize = randomIntBetween(100, 300); + logger.info("Indexing {} records", indexSize); + for (int i = 0; i < indexSize; i++) { + bulkRequestBuilder.add(new IndexRequest("test", "doc", "id" + i).source("data", "bar", "count", i)); + } + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + ensureYellow("test"); + + assertEquals(0, getNumberOfSearchContexts()); + + int fetchSize = randomIntBetween(5, 20); + logger.info("Fetching {} records at a time", fetchSize); + SqlResponse sqlResponse = client().prepareExecute(SqlAction.INSTANCE).query("SELECT * FROM test").fetchSize(fetchSize).get(); + assertEquals(fetchSize, sqlResponse.size()); + + assertThat(getNumberOfSearchContexts(), greaterThan(0L)); + assertThat(sqlResponse.cursor(), notNullValue()); + assertThat(sqlResponse.cursor(), not(equalTo(Cursor.EMPTY))); + + Response cleanCursorResponse = client().prepareExecute(SqlClearCursorAction.INSTANCE).cursor(sqlResponse.cursor()).get(); + assertTrue(cleanCursorResponse.isSucceeded()); + + assertEquals(0, getNumberOfSearchContexts()); + } + + public void testAutoCursorCleanup() throws Exception { + assertAcked(client().admin().indices().prepareCreate("test").get()); + BulkRequestBuilder bulkRequestBuilder = client().prepareBulk(); + int indexSize = randomIntBetween(100, 300); + logger.info("Indexing {} records", indexSize); + for (int i = 0; i < indexSize; i++) { + bulkRequestBuilder.add(new IndexRequest("test", "doc", "id" + i).source("data", "bar", "count", i)); + } + bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get(); + ensureYellow("test"); + + assertEquals(0, getNumberOfSearchContexts()); + + int fetchSize = randomIntBetween(5, 20); + logger.info("Fetching {} records at a time", fetchSize); + SqlResponse sqlResponse = client().prepareExecute(SqlAction.INSTANCE).query("SELECT * FROM test").fetchSize(fetchSize).get(); + assertEquals(fetchSize, sqlResponse.size()); + + assertThat(getNumberOfSearchContexts(), greaterThan(0L)); + assertThat(sqlResponse.cursor(), notNullValue()); + assertThat(sqlResponse.cursor(), not(equalTo(Cursor.EMPTY))); + + long fetched = sqlResponse.size(); + do { + sqlResponse = client().prepareExecute(SqlAction.INSTANCE).cursor(sqlResponse.cursor()).get(); + fetched += sqlResponse.size(); + } while (sqlResponse.cursor().equals(Cursor.EMPTY) == false); + assertEquals(indexSize, fetched); + + Response cleanCursorResponse = client().prepareExecute(SqlClearCursorAction.INSTANCE).cursor(sqlResponse.cursor()).get(); + assertFalse(cleanCursorResponse.isSucceeded()); + + assertEquals(0, getNumberOfSearchContexts()); + } + + private long getNumberOfSearchContexts() { + return client().admin().indices().prepareStats("test").clear().setSearch(true).get() + .getIndex("test").getTotal().getSearch().getOpenContexts(); + } +} diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.sql.clear_cursor.json b/plugin/src/test/resources/rest-api-spec/api/xpack.sql.clear_cursor.json new file mode 100644 index 00000000000..d82e499c701 --- /dev/null +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.sql.clear_cursor.json @@ -0,0 +1,15 @@ +{ + "xpack.sql.clear_cursor": { + "documentation": "Clear SQL cursor", + "methods": [ "POST"], + "url": { + "path": "/_xpack/sql/close", + "paths": [ "/_xpack/sql/close" ], + "parts": {} + }, + "body": { + "description" : "Specify the cursor value in the `cursor` element to clean the cursor.", + "required" : true + } + } +} diff --git a/plugin/src/test/resources/rest-api-spec/api/xpack.sql.json b/plugin/src/test/resources/rest-api-spec/api/xpack.sql.query.json similarity index 95% rename from plugin/src/test/resources/rest-api-spec/api/xpack.sql.json rename to plugin/src/test/resources/rest-api-spec/api/xpack.sql.query.json index 768f8090c82..60bbcda8cad 100644 --- a/plugin/src/test/resources/rest-api-spec/api/xpack.sql.json +++ b/plugin/src/test/resources/rest-api-spec/api/xpack.sql.query.json @@ -1,5 +1,5 @@ { - "xpack.sql": { + "xpack.sql.query": { "documentation": "Execute SQL", "methods": [ "POST", "GET" ], "url": { @@ -18,4 +18,4 @@ "required" : true } } - } +} diff --git a/plugin/src/test/resources/rest-api-spec/test/sql/sql.yml b/plugin/src/test/resources/rest-api-spec/test/sql/sql.yml index 4274edecc05..033d3223002 100644 --- a/plugin/src/test/resources/rest-api-spec/test/sql/sql.yml +++ b/plugin/src/test/resources/rest-api-spec/test/sql/sql.yml @@ -1,5 +1,5 @@ --- -"Execute some SQL": +setup: - do: bulk: refresh: true @@ -16,9 +16,17 @@ _id: 2 - str: test2 int: 2 + - index: + _index: test + _type: doc + _id: 3 + - str: test3 + int: 3 +--- +"Execute some SQL": - do: - xpack.sql: + xpack.sql.query: format: json body: query: "SELECT * FROM test ORDER BY int asc" @@ -28,9 +36,50 @@ - match: { rows.0.1: test1 } - match: { rows.1.0: 2 } - match: { rows.1.1: test2 } + - match: { rows.2.0: 3 } + - match: { rows.2.1: test3 } + +--- +"Paging through results": + - do: + xpack.sql.query: + format: json + body: + query: "SELECT * FROM test ORDER BY int asc" + fetch_size: 2 + - match: { columns.0.name: int } + - match: { columns.1.name: str } + - match: { rows.0.0: 1 } + - match: { rows.0.1: test1 } + - match: { rows.1.0: 2 } + - match: { rows.1.1: test2 } + - is_true: cursor + - set: { cursor: cursor } - do: - xpack.sql: + xpack.sql.query: + format: json + body: + cursor: "$cursor" + - match: { rows.0.0: 3 } + - match: { rows.0.1: test3 } + - is_false: columns + - is_true: cursor + - set: { cursor: cursor } + + - do: + xpack.sql.query: + format: json + body: + cursor: "$cursor" + - is_false: columns + - is_false: cursor + - length: { rows: 0 } + +--- +"Getting textual representation": + - do: + xpack.sql.query: format: text body: query: "SELECT * FROM test ORDER BY int asc" @@ -40,4 +89,32 @@ ---------------\+---------------\n 1 \s+ \|test1 \s+ \n 2 \s+ \|test2 \s+ \n + 3 \s+ \|test3 \s+ \n $/ + +--- +"Clean cursor": + - do: + xpack.sql.query: + format: json + body: + query: "SELECT * FROM test ORDER BY int asc" + fetch_size: 2 + - match: { columns.0.name: int } + - match: { columns.1.name: str } + - match: { rows.0.0: 1 } + - match: { rows.0.1: test1 } + - is_true: cursor + - set: { cursor: cursor} + + - do: + xpack.sql.clear_cursor: + body: + cursor: "$cursor" + - match: { "succeeded": true } + + - do: + indices.stats: { index: 'test' } + + - match: { indices.test.total.search.open_contexts: 0 } + diff --git a/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/cli/CliIntegrationTestCase.java b/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/cli/CliIntegrationTestCase.java index 0c950467b04..241d3b6a76b 100644 --- a/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/cli/CliIntegrationTestCase.java +++ b/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/cli/CliIntegrationTestCase.java @@ -18,6 +18,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.transport.client.PreBuiltTransportClient; import org.elasticsearch.xpack.qa.sql.embed.CliHttpServer; +import org.elasticsearch.xpack.qa.sql.rest.RestSqlTestCase; import org.junit.After; import org.junit.Before; import org.junit.ClassRule; @@ -29,6 +30,7 @@ import java.security.AccessControlException; import java.util.function.Supplier; import static java.util.Collections.singletonMap; +import static org.elasticsearch.xpack.qa.sql.rest.RestSqlTestCase.assertNoSearchContexts; public abstract class CliIntegrationTestCase extends ESRestTestCase { /** @@ -67,12 +69,13 @@ public abstract class CliIntegrationTestCase extends ESRestTestCase { } @After - public void orderlyShutdown() throws IOException, InterruptedException { + public void orderlyShutdown() throws Exception { if (cli == null) { // failed to connect to the cli so there is nothing to do here return; } cli.close(); + assertNoSearchContexts(); } /** diff --git a/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/jdbc/FetchSizeTestCase.java b/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/jdbc/FetchSizeTestCase.java index c5a44a48f8b..8d395c05709 100644 --- a/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/jdbc/FetchSizeTestCase.java +++ b/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/jdbc/FetchSizeTestCase.java @@ -7,6 +7,7 @@ package org.elasticsearch.xpack.qa.sql.jdbc; import org.apache.http.entity.ContentType; import org.apache.http.entity.StringEntity; +import org.elasticsearch.xpack.qa.sql.rest.RestSqlTestCase; import org.junit.Before; import java.io.IOException; @@ -16,6 +17,7 @@ import java.sql.SQLException; import java.sql.Statement; import static java.util.Collections.singletonMap; +import static org.elasticsearch.xpack.qa.sql.rest.RestSqlTestCase.assertNoSearchContexts; /** * Tests for setting {@link Statement#setFetchSize(int)} and @@ -52,6 +54,27 @@ public class FetchSizeTestCase extends JdbcIntegrationTestCase { } } + /** + * Test for {@code SELECT} that is implemented as a scroll query. + * In this test we don't retrieve all records and rely on close() to clean the cursor + */ + public void testIncompleteScroll() throws Exception { + try (Connection c = esJdbc(); + Statement s = c.createStatement()) { + s.setFetchSize(4); + try (ResultSet rs = s.executeQuery("SELECT * FROM test ORDER BY test_field ASC")) { + for (int i = 0; i < 10; i++) { + assertEquals(4, rs.getFetchSize()); + assertTrue(rs.next()); + assertEquals(i, rs.getInt(1)); + } + assertTrue(rs.next()); + } + } + assertNoSearchContexts(); + } + + /** * Test for {@code SELECT} that is implemented as an aggregation. * In this case the fetch size should be entirely ignored. diff --git a/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/jdbc/JdbcIntegrationTestCase.java b/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/jdbc/JdbcIntegrationTestCase.java index 59fe9a1dec4..01d1efd7850 100644 --- a/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/jdbc/JdbcIntegrationTestCase.java +++ b/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/jdbc/JdbcIntegrationTestCase.java @@ -16,9 +16,12 @@ import org.elasticsearch.common.xcontent.XContentHelper; import org.elasticsearch.common.xcontent.json.JsonXContent; import org.elasticsearch.test.rest.ESRestTestCase; import org.elasticsearch.xpack.qa.sql.embed.EmbeddedJdbcServer; +import org.elasticsearch.xpack.qa.sql.rest.RestSqlTestCase; import org.elasticsearch.xpack.sql.jdbc.jdbc.JdbcConfiguration; import org.elasticsearch.xpack.sql.jdbc.jdbcx.JdbcDataSource; import org.joda.time.DateTimeZone; +import org.junit.After; +import org.junit.Before; import org.junit.ClassRule; import java.io.IOException; @@ -35,6 +38,7 @@ import java.util.Set; import java.util.TimeZone; import static java.util.Collections.singletonMap; +import static org.elasticsearch.xpack.qa.sql.rest.RestSqlTestCase.assertNoSearchContexts; public abstract class JdbcIntegrationTestCase extends ESRestTestCase { /** @@ -52,6 +56,12 @@ public abstract class JdbcIntegrationTestCase extends ESRestTestCase { @ClassRule public static final EmbeddedJdbcServer EMBEDDED_SERVER = EMBED_SQL ? new EmbeddedJdbcServer() : null; + @After + public void checkSearchContent() throws Exception { + // Some context might linger due to fire and forget nature of scroll cleanup + assertNoSearchContexts(); + } + /** * Read an address for Elasticsearch suitable for the JDBC driver from the system properties. */ diff --git a/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/rest/RestSqlTestCase.java b/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/rest/RestSqlTestCase.java index 4079b89f1c2..5f390b0d73e 100644 --- a/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/rest/RestSqlTestCase.java +++ b/qa/sql/src/main/java/org/elasticsearch/xpack/qa/sql/rest/RestSqlTestCase.java @@ -275,9 +275,9 @@ public abstract class RestSqlTestCase extends ESRestTestCase implements ErrorsTe new StringEntity(bulk.toString(), ContentType.APPLICATION_JSON)); String expected = "test \n" + - "---------------\n" + - "test \n" + - "test \n"; + "---------------\n" + + "test \n" + + "test \n"; Tuple response = runSqlAsText("SELECT * FROM test"); logger.warn(expected); logger.warn(response.v1()); @@ -318,6 +318,11 @@ public abstract class RestSqlTestCase extends ESRestTestCase implements ErrorsTe expected.put("size", 0); expected.put("rows", emptyList()); assertResponse(expected, runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"}", ContentType.APPLICATION_JSON))); + + Map response = runSql("/close", new StringEntity("{\"cursor\":\"" + cursor + "\"}", ContentType.APPLICATION_JSON)); + assertEquals(true, response.get("succeeded")); + + assertEquals(0, getNumberOfSearchContexts("test")); } private Tuple runSqlAsText(String sql) throws IOException { @@ -325,7 +330,7 @@ public abstract class RestSqlTestCase extends ESRestTestCase implements ErrorsTe } private Tuple runSqlAsText(String suffix, HttpEntity sql) throws IOException { - Response response = client().performRequest("POST", "/_xpack/sql" + suffix, singletonMap("error_trace", "true"), sql); + Response response = client().performRequest("POST", "/_xpack/sql" + suffix, singletonMap("error_trace", "true"), sql); return new Tuple<>( Streams.copyToString(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8)), response.getHeader("Cursor") @@ -340,4 +345,34 @@ public abstract class RestSqlTestCase extends ESRestTestCase implements ErrorsTe } } + public static int getNumberOfSearchContexts(String index) throws IOException { + Response response = client().performRequest("GET", "/_stats/search"); + Map stats; + try (InputStream content = response.getEntity().getContent()) { + stats = XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false); + } + return getOpenContexts(stats, index); + } + + public static void assertNoSearchContexts() throws IOException { + Response response = client().performRequest("GET", "/_stats/search"); + Map stats; + try (InputStream content = response.getEntity().getContent()) { + stats = XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false); + } + @SuppressWarnings("unchecked") + Map indexStats = (Map) stats.get("indices"); + for (String index : indexStats.keySet()) { + if (index.startsWith(".") == false) { // We are not interested in internal indices + assertEquals(index + " should have no search contexts", 0, getOpenContexts(stats, index)); + } + } + } + + @SuppressWarnings("unchecked") + public static int getOpenContexts(Map indexStats, String index) { + return (int) ((Map) ((Map) ((Map) ((Map) + indexStats.get("indices")).get(index)).get("total")).get("search")).get("open_contexts"); + } + } diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/Proto.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/Proto.java index c6358392551..d99404b4e11 100644 --- a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/Proto.java +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/Proto.java @@ -33,7 +33,8 @@ public final class Proto extends AbstractProto { public enum RequestType implements AbstractProto.RequestType { INFO(InfoRequest::new), QUERY_INIT(QueryInitRequest::new), - QUERY_PAGE(QueryPageRequest::new); + QUERY_PAGE(QueryPageRequest::new), + QUERY_CLOSE(QueryCloseRequest::new); private final RequestReader reader; @@ -64,7 +65,8 @@ public final class Proto extends AbstractProto { public enum ResponseType implements AbstractProto.ResponseType { INFO(InfoResponse::new), QUERY_INIT(QueryInitResponse::new), - QUERY_PAGE(QueryPageResponse::new); + QUERY_PAGE(QueryPageResponse::new), + QUERY_CLOSE(QueryCloseResponse::new); private final ResponseReader reader; diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseRequest.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseRequest.java new file mode 100644 index 00000000000..5d372092825 --- /dev/null +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseRequest.java @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli.net.protocol; + +import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryCloseRequest; +import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput; + +import java.io.IOException; + +public class QueryCloseRequest extends AbstractQueryCloseRequest { + public QueryCloseRequest(String cursor) { + super(cursor); + } + + QueryCloseRequest(SqlDataInput in) throws IOException { + super(in); + } + + @Override + public Proto.RequestType requestType() { + return Proto.RequestType.QUERY_CLOSE; + } +} diff --git a/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseResponse.java b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseResponse.java new file mode 100644 index 00000000000..dbb9d4e85e5 --- /dev/null +++ b/sql/cli-proto/src/main/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseResponse.java @@ -0,0 +1,34 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli.net.protocol; + +import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType; +import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType; +import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryCloseResponse; +import org.elasticsearch.xpack.sql.protocol.shared.Request; + +import java.io.DataInput; +import java.io.IOException; + +public class QueryCloseResponse extends AbstractQueryCloseResponse { + public QueryCloseResponse(boolean succeeded) { + super(succeeded); + } + + QueryCloseResponse(Request request, DataInput in) throws IOException { + super(request, in); + } + + @Override + public RequestType requestType() { + return RequestType.QUERY_CLOSE; + } + + @Override + public ResponseType responseType() { + return ResponseType.QUERY_CLOSE; + } +} diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseRequestTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseRequestTests.java new file mode 100644 index 00000000000..2c9bec200d0 --- /dev/null +++ b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseRequestTests.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli.net.protocol; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo; + +import java.io.IOException; + +import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion; + +public class QueryCloseRequestTests extends ESTestCase { + static QueryCloseRequest randomQueryCloseRequest() { + String cursor = randomAlphaOfLength(10); + return new QueryCloseRequest(cursor); + } + + public void testRoundTrip() throws IOException { + assertRoundTripCurrentVersion(randomQueryCloseRequest()); + } + + public void testToString() { + assertEquals("QueryCloseRequest<0320>", new QueryCloseRequest("0320").toString()); + } +} diff --git a/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseResponseTests.java b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseResponseTests.java new file mode 100644 index 00000000000..e888b78dba3 --- /dev/null +++ b/sql/cli-proto/src/test/java/org/elasticsearch/xpack/sql/cli/net/protocol/QueryCloseResponseTests.java @@ -0,0 +1,28 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.cli.net.protocol; + +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion; +import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryCloseRequestTests.randomQueryCloseRequest; + +public class QueryCloseResponseTests extends ESTestCase { + static QueryCloseResponse randomQueryCloseResponse() { + return new QueryCloseResponse(randomBoolean()); + } + + public void testRoundTrip() throws IOException { + assertRoundTripCurrentVersion(randomQueryCloseRequest(), randomQueryCloseResponse()); + } + + public void testToString() { + assertEquals("QueryCloseResponse", + new QueryCloseResponse(true).toString()); + } +} diff --git a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java index 490b230de94..e32a013de02 100644 --- a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java +++ b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/CliHttpClient.java @@ -8,6 +8,8 @@ package org.elasticsearch.xpack.sql.cli; import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest; import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse; import org.elasticsearch.xpack.sql.cli.net.protocol.Proto; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryCloseRequest; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryCloseResponse; import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequest; import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequest; import org.elasticsearch.xpack.sql.cli.net.protocol.QueryResponse; @@ -22,6 +24,8 @@ import java.security.AccessController; import java.security.PrivilegedAction; import java.sql.SQLException; import java.time.Instant; +import java.util.Collections; +import java.util.Map; import java.util.TimeZone; public class CliHttpClient { @@ -47,6 +51,11 @@ public class CliHttpClient { return (QueryResponse) post(request); } + public QueryCloseResponse queryClose(String cursor) throws SQLException { + QueryCloseRequest request = new QueryCloseRequest(cursor); + return (QueryCloseResponse) post(request); + } + private TimeoutInfo timeout() { long clientTime = Instant.now().toEpochMilli(); return new TimeoutInfo(clientTime, cfg.queryTimeout(), cfg.pageTimeout()); diff --git a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/command/ServerQueryCliCommand.java b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/command/ServerQueryCliCommand.java index 2be15ea7f8c..c75a3784101 100644 --- a/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/command/ServerQueryCliCommand.java +++ b/sql/cli/src/main/java/org/elasticsearch/xpack/sql/cli/command/ServerQueryCliCommand.java @@ -5,6 +5,7 @@ */ package org.elasticsearch.xpack.sql.cli.command; +import org.elasticsearch.xpack.sql.cli.CliHttpClient; import org.elasticsearch.xpack.sql.cli.CliTerminal; import org.elasticsearch.xpack.sql.cli.net.protocol.QueryResponse; import org.elasticsearch.xpack.sql.client.shared.JreHttpUrlConnection; @@ -20,42 +21,41 @@ public class ServerQueryCliCommand extends AbstractServerCliCommand { @Override protected boolean doHandle(CliTerminal terminal, CliSession cliSession, String line) { - QueryResponse response; + QueryResponse response = null; + CliHttpClient cliClient = cliSession.getClient(); try { - response = cliSession.getClient().queryInit(line, cliSession.getFetchSize()); + response = cliClient.queryInit(line, cliSession.getFetchSize()); + if (response.data.startsWith("digraph ")) { + handleGraphviz(terminal, response.data); + return true; + } + while (true) { + handleText(terminal, response.data); + if (response.cursor().isEmpty()) { + // Successfully finished the entire query! + terminal.flush(); + return true; + } + if (false == cliSession.getFetchSeparator().equals("")) { + terminal.println(cliSession.getFetchSeparator()); + } + response = cliSession.getClient().nextPage(response.cursor()); + } } catch (SQLException e) { if (JreHttpUrlConnection.SQL_STATE_BAD_SERVER.equals(e.getSQLState())) { terminal.error("Server error", e.getMessage()); } else { terminal.error("Bad request", e.getMessage()); } - return true; - } - if (response.data.startsWith("digraph ")) { - handleGraphviz(terminal, response.data); - return true; - } - while (true) { - handleText(terminal, response.data); - if (response.cursor().isEmpty()) { - // Successfully finished the entire query! - terminal.flush(); - return true; - } - if (false == cliSession.getFetchSeparator().equals("")) { - terminal.println(cliSession.getFetchSeparator()); - } - try { - response = cliSession.getClient().nextPage(response.cursor()); - } catch (SQLException e) { - if (JreHttpUrlConnection.SQL_STATE_BAD_SERVER.equals(e.getSQLState())) { - terminal.error("Server error", e.getMessage()); - } else { - terminal.error("Bad request", e.getMessage()); + if (response != null && response.cursor().isEmpty() == false) { + try { + cliClient.queryClose(response.cursor()); + } catch (SQLException ex) { + terminal.error("Could not close cursor", ex.getMessage()); } - return true; } } + return true; } private void handleText(CliTerminal terminal, String str) { diff --git a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/command/ServerQueryCliCommandTests.java b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/command/ServerQueryCliCommandTests.java index 02c9faa778e..f4b7c1b039f 100644 --- a/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/command/ServerQueryCliCommandTests.java +++ b/sql/cli/src/test/java/org/elasticsearch/xpack/sql/cli/command/ServerQueryCliCommandTests.java @@ -8,6 +8,7 @@ package org.elasticsearch.xpack.sql.cli.command; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.xpack.sql.cli.CliHttpClient; import org.elasticsearch.xpack.sql.cli.TestTerminal; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryCloseResponse; import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitResponse; import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageResponse; @@ -81,4 +82,21 @@ public class ServerQueryCliCommandTests extends ESTestCase { verifyNoMoreInteractions(client); } + public void testCursorCleanupOnError() throws Exception { + TestTerminal testTerminal = new TestTerminal(); + CliHttpClient client = mock(CliHttpClient.class); + CliSession cliSession = new CliSession(client); + cliSession.setFetchSize(15); + when(client.queryInit("test query", 15)).thenReturn(new QueryInitResponse(123, "my_cursor1", "first")); + when(client.nextPage("my_cursor1")).thenThrow(new SQLException("test exception")); + when(client.queryClose("my_cursor1")).thenReturn(new QueryCloseResponse(true)); + ServerQueryCliCommand cliCommand = new ServerQueryCliCommand(); + assertTrue(cliCommand.handle(testTerminal, cliSession, "test query")); + assertEquals("firstBad request [test exception]\n", testTerminal.toString()); + verify(client, times(1)).queryInit(eq("test query"), eq(15)); + verify(client, times(1)).nextPage(any()); + verify(client, times(1)).queryClose(eq("my_cursor1")); + verifyNoMoreInteractions(client); + } + } \ No newline at end of file diff --git a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/Proto.java b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/Proto.java index a70c42e9f52..68ce28abd4f 100644 --- a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/Proto.java +++ b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/Proto.java @@ -36,7 +36,7 @@ public final class Proto extends AbstractProto { META_COLUMN(MetaColumnRequest::new), QUERY_INIT(QueryInitRequest::new), QUERY_PAGE(QueryPageRequest::new), - // QUERY_CLOSE(QueryClosenRequest::new), TODO implement me + QUERY_CLOSE(QueryCloseRequest::new) ; private final RequestReader reader; @@ -71,7 +71,7 @@ public final class Proto extends AbstractProto { META_COLUMN(MetaColumnResponse::new), QUERY_INIT(QueryInitResponse::new), QUERY_PAGE(QueryPageResponse::new), -// QUERY_CLOSE(QueryClosenResponse::new) TODO implement me + QUERY_CLOSE(QueryCloseResponse::new) ; private final ResponseReader reader; diff --git a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseRequest.java b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseRequest.java new file mode 100644 index 00000000000..4211e63f082 --- /dev/null +++ b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseRequest.java @@ -0,0 +1,32 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.jdbc.net.protocol; + +import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType; +import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryCloseRequest; +import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryPageRequest; +import org.elasticsearch.xpack.sql.protocol.shared.Nullable; +import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput; +import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo; + +import java.io.IOException; + +public class QueryCloseRequest extends AbstractQueryCloseRequest { + + public QueryCloseRequest(String cursor) { + super(cursor); + } + + QueryCloseRequest(SqlDataInput in) throws IOException { + super(in); + } + + @Override + public RequestType requestType() { + return RequestType.QUERY_CLOSE; + } + +} diff --git a/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseResponse.java b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseResponse.java new file mode 100644 index 00000000000..81fd4516c06 --- /dev/null +++ b/sql/jdbc-proto/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseResponse.java @@ -0,0 +1,35 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.jdbc.net.protocol; + +import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType; +import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.ResponseType; +import org.elasticsearch.xpack.sql.protocol.shared.AbstractInfoResponse; +import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryCloseResponse; +import org.elasticsearch.xpack.sql.protocol.shared.Request; + +import java.io.DataInput; +import java.io.IOException; + +public class QueryCloseResponse extends AbstractQueryCloseResponse { + public QueryCloseResponse(boolean succeeded) { + super(succeeded); + } + + QueryCloseResponse(Request request, DataInput in) throws IOException { + super(request, in); + } + + @Override + public RequestType requestType() { + return RequestType.QUERY_CLOSE; + } + + @Override + public ResponseType responseType() { + return ResponseType.QUERY_CLOSE; + } +} \ No newline at end of file diff --git a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseRequestTests.java b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseRequestTests.java new file mode 100644 index 00000000000..13e575988f5 --- /dev/null +++ b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseRequestTests.java @@ -0,0 +1,29 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.jdbc.net.protocol; + +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo; + +import java.io.IOException; + +import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.assertRoundTripCurrentVersion; +import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.randomTimeoutInfo; + +public class QueryCloseRequestTests extends ESTestCase { + static QueryCloseRequest randomQueryCloseRequest() { + String cursor = randomAlphaOfLength(10); + return new QueryCloseRequest(cursor); + } + + public void testRoundTrip() throws IOException { + assertRoundTripCurrentVersion(randomQueryCloseRequest()); + } + + public void testToString() { + assertEquals("QueryCloseRequest<123>", new QueryCloseRequest("123").toString()); + } +} diff --git a/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseResponseTests.java b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseResponseTests.java new file mode 100644 index 00000000000..45d7ee16d06 --- /dev/null +++ b/sql/jdbc-proto/src/test/java/org/elasticsearch/xpack/sql/jdbc/net/protocol/QueryCloseResponseTests.java @@ -0,0 +1,26 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.jdbc.net.protocol; + +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; + +import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.assertRoundTripCurrentVersion; + +public class QueryCloseResponseTests extends ESTestCase { + static QueryCloseResponse randomQueryCloseResponse() { + return new QueryCloseResponse(randomBoolean()); + } + + public void testRoundTrip() throws IOException { + assertRoundTripCurrentVersion(QueryCloseRequestTests::randomQueryCloseRequest, randomQueryCloseResponse()); + } + + public void testToString() { + assertEquals("QueryCloseResponse", new QueryCloseResponse(true).toString()); + } +} diff --git a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/jdbc/JdbcDatabaseMetaData.java b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/jdbc/JdbcDatabaseMetaData.java index e0a3d7debc8..fffda14927c 100644 --- a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/jdbc/JdbcDatabaseMetaData.java +++ b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/jdbc/JdbcDatabaseMetaData.java @@ -1257,5 +1257,10 @@ class JdbcDatabaseMetaData implements DatabaseMetaData, JdbcWrapper { public int batchSize() { return data.length; } + + @Override + public void close() throws SQLException { + // this cursor doesn't hold any resource - no need to clean up + } } } diff --git a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/jdbc/JdbcResultSet.java b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/jdbc/JdbcResultSet.java index bc7c7363994..e56c74193d7 100644 --- a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/jdbc/JdbcResultSet.java +++ b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/jdbc/JdbcResultSet.java @@ -109,12 +109,13 @@ class JdbcResultSet implements ResultSet, JdbcWrapper { } @Override - public void close() { + public void close() throws SQLException { if (!closed) { closed = true; if (statement != null) { statement.resultSetWasClosed(); } + cursor.close(); } } diff --git a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/jdbc/JdbcStatement.java b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/jdbc/JdbcStatement.java index 5c664d1abab..ae9a6621bee 100644 --- a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/jdbc/JdbcStatement.java +++ b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/jdbc/JdbcStatement.java @@ -48,7 +48,7 @@ class JdbcStatement implements Statement, JdbcWrapper { } @Override - public void close() { + public void close() throws SQLException { if (!closed) { closed = true; closeResultSet(); @@ -382,7 +382,7 @@ class JdbcStatement implements Statement, JdbcWrapper { } } - protected final void closeResultSet() { + protected final void closeResultSet() throws SQLException { if (rs != null) { ignoreResultSetClose = true; try { @@ -394,7 +394,7 @@ class JdbcStatement implements Statement, JdbcWrapper { } } - final void resultSetWasClosed() { + final void resultSetWasClosed() throws SQLException { if (closeOnCompletion && !ignoreResultSetClose) { close(); } diff --git a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/Cursor.java b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/Cursor.java index 9262d96100e..5549bc57d63 100644 --- a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/Cursor.java +++ b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/Cursor.java @@ -27,4 +27,6 @@ public interface Cursor { * server in the current batch. */ int batchSize(); + + void close() throws SQLException; } diff --git a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java index 02cc8aa8d9c..8e984e9be7b 100644 --- a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java +++ b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/DefaultCursor.java @@ -57,4 +57,11 @@ class DefaultCursor implements Cursor { public int batchSize() { return page.rows(); } + + @Override + public void close() throws SQLException { + if (cursor.isEmpty() == false) { + client.queryClose(cursor); + } + } } diff --git a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/HttpClient.java b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/HttpClient.java index 1a06cb19f29..74beeedc76a 100644 --- a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/HttpClient.java +++ b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/HttpClient.java @@ -8,7 +8,6 @@ package org.elasticsearch.xpack.sql.jdbc.net.client; import org.elasticsearch.xpack.sql.client.shared.ClientException; import org.elasticsearch.xpack.sql.client.shared.JreHttpUrlConnection; import org.elasticsearch.xpack.sql.client.shared.JreHttpUrlConnection.ResponseOrException; -import org.elasticsearch.xpack.sql.jdbc.JdbcException; import org.elasticsearch.xpack.sql.jdbc.JdbcSQLException; import org.elasticsearch.xpack.sql.jdbc.jdbc.JdbcConfiguration; import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto; @@ -60,4 +59,5 @@ class HttpClient { throw new JdbcSQLException(ex, "Transport failure"); } } + } diff --git a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java index 13638db33cc..02a4a7003f2 100644 --- a/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java +++ b/sql/jdbc/src/main/java/org/elasticsearch/xpack/sql/jdbc/net/client/JdbcHttpClient.java @@ -14,6 +14,8 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnResponse; import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest; import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse; import org.elasticsearch.xpack.sql.jdbc.net.protocol.Page; +import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryCloseRequest; +import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryCloseResponse; import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitRequest; import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitResponse; import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequest; @@ -24,7 +26,9 @@ import java.io.DataInput; import java.io.IOException; import java.sql.SQLException; import java.time.Instant; +import java.util.Collections; import java.util.List; +import java.util.Map; public class JdbcHttpClient { @FunctionalInterface @@ -69,6 +73,11 @@ public class JdbcHttpClient { return ((QueryPageResponse) http.post(request)).cursor(); } + public boolean queryClose(String cursor) throws SQLException { + QueryCloseRequest request = new QueryCloseRequest(cursor); + return ((QueryCloseResponse) http.post(request)).succeeded(); + } + public InfoResponse serverInfo() throws SQLException { if (serverInfo == null) { serverInfo = fetchServerInfo(); diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java index 82ae7f4158d..22549aa4de4 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/PlanExecutor.java @@ -71,4 +71,8 @@ public class PlanExecutor { public void nextPage(Configuration cfg, Cursor cursor, ActionListener listener) { cursor.nextPage(cfg, client, listener); } + + public void cleanCursor(Configuration cfg, Cursor cursor, ActionListener listener) { + cursor.clear(cfg, client, listener); + } } \ No newline at end of file diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AbstractSearchHitRowSet.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AbstractSearchHitRowSet.java index a538ecc4d05..5ac37c2482f 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AbstractSearchHitRowSet.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/AbstractSearchHitRowSet.java @@ -22,6 +22,7 @@ import java.util.Set; */ abstract class AbstractSearchHitRowSet extends AbstractRowSet { private final SearchHit[] hits; + private final Cursor cursor; private final String scrollId; private final List extractors; private final Set innerHits = new LinkedHashSet<>(); @@ -75,6 +76,21 @@ abstract class AbstractSearchHitRowSet extends AbstractRowSet { size = limitHits < 0 ? sz : Math.min(sz, limitHits); indexPerLevel = new int[maxDepth + 1]; this.innerHit = innerHit; + + if (scrollId == null) { + /* SearchResponse can contain a null scroll when you start a + * scroll but all results fit in the first page. */ + cursor = Cursor.EMPTY; + } else { + // compute remaining limit + int remainingLimit = limit - size; + // if the computed limit is zero, or the size is zero it means either there's nothing left or the limit has been reached + if (size == 0 || remainingLimit == 0) { + cursor = Cursor.EMPTY; + } else { + cursor = new ScrollCursor(scrollId, extractors, remainingLimit); + } + } } @Override @@ -152,17 +168,6 @@ abstract class AbstractSearchHitRowSet extends AbstractRowSet { @Override public Cursor nextPageCursor() { - if (scrollId == null) { - /* SearchResponse can contain a null scroll when you start a - * scroll but all results fit in the first page. */ - return Cursor.EMPTY; - } - // compute remaining limit - int remainingLimit = limit - size; - // if the computed limit is zero, or the size is zero it means either there's nothing left or the limit has been reached - if (size == 0 || remainingLimit == 0) { - return Cursor.EMPTY; - } - return new ScrollCursor(scrollId, extractors, remainingLimit); + return cursor; } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java index 732f1f86145..b75878997e6 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/ScrollCursor.java @@ -7,6 +7,8 @@ package org.elasticsearch.xpack.sql.execution.search; import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.search.ClearScrollResponse; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchScrollRequest; import org.elasticsearch.client.Client; @@ -57,12 +59,25 @@ public class ScrollCursor implements Cursor { public void nextPage(Configuration cfg, Client client, ActionListener listener) { SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(cfg.pageTimeout()); client.searchScroll(request, ActionListener.wrap((SearchResponse response) -> { - int limitHits = limit; - listener.onResponse(new ScrolledSearchHitRowSet(extractors, response.getHits().getHits(), - limitHits, response.getScrollId())); + ScrolledSearchHitRowSet rowSet = new ScrolledSearchHitRowSet(extractors, response.getHits().getHits(), + limit, response.getScrollId()); + if (rowSet.nextPageCursor() == Cursor.EMPTY ) { + // we are finished with this cursor, let's clean it before continuing + clear(cfg, client, ActionListener.wrap(success -> listener.onResponse(rowSet), listener::onFailure)); + } else { + listener.onResponse(rowSet); + } }, listener::onFailure)); } + @Override + public void clear(Configuration cfg, Client client, ActionListener listener) { + cleanCursor(client, scrollId, + ActionListener.wrap( + clearScrollResponse -> listener.onResponse(clearScrollResponse.isSucceeded()), + listener::onFailure)); + } + @Override public boolean equals(Object obj) { if (obj == null || obj.getClass() != getClass()) { @@ -83,4 +98,11 @@ public class ScrollCursor implements Cursor { public String toString() { return "cursor for scroll [" + scrollId + "]"; } + + public static void cleanCursor(Client client, String scrollId, ActionListener listener) { + ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); + clearScrollRequest.addScrollId(scrollId); + client.clearScroll(clearScrollRequest, listener); + } + } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java index d0031ac1dff..978ceced06b 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/execution/search/Scroller.java @@ -110,7 +110,7 @@ public class Scroller { } @Override - protected SchemaRowSet handleResponse(SearchResponse response) { + protected void handleResponse(SearchResponse response, ActionListener listener) { final List extractedAggs = new ArrayList<>(); AggValues aggValues = new AggValues(extractedAggs); @@ -156,9 +156,9 @@ public class Scroller { } aggValues.init(maxDepth, query.limit()); - clearScroll(response.getScrollId()); - - return new AggsRowSet(schema, aggValues, aggColumns); + clearScroll(response.getScrollId(), ActionListener.wrap( + succeeded -> listener.onResponse(new AggsRowSet(schema, aggValues, aggColumns)), + listener::onFailure)); } private Object[] extractAggValue(ColumnReference col, SearchResponse response) { @@ -230,7 +230,8 @@ public class Scroller { super.onResponse(response); } - protected SchemaRowSet handleResponse(SearchResponse response) { + @Override + protected void handleResponse(SearchResponse response, ActionListener listener) { SearchHit[] hits = response.getHits().getHits(); List exts = getExtractors(); @@ -239,23 +240,23 @@ public class Scroller { String scrollId = response.getScrollId(); // if there's an id, try to setup next scroll - if (scrollId != null) { + if (scrollId != null && // is all the content already retrieved? - if (Boolean.TRUE.equals(response.isTerminatedEarly()) || response.getHits().getTotalHits() == hits.length + (Boolean.TRUE.equals(response.isTerminatedEarly()) || response.getHits().getTotalHits() == hits.length // or maybe the limit has been reached - || (hits.length >= query.limit() && query.limit() > -1)) { + || (hits.length >= query.limit() && query.limit() > -1))) { // if so, clear the scroll - clearScroll(scrollId); - // and remove it to indicate no more data is expected - scrollId = null; - } + clearScroll(response.getScrollId(), ActionListener.wrap( + succeeded -> listener.onResponse(new InitialSearchHitRowSet(schema, exts, hits, query.limit(), null)), + listener::onFailure)); + } else { + listener.onResponse(new InitialSearchHitRowSet(schema, exts, hits, query.limit(), scrollId)); } - return new InitialSearchHitRowSet(schema, exts, hits, query.limit(), scrollId); } // no hits else { - clearScroll(response.getScrollId()); - return Rows.empty(schema); + clearScroll(response.getScrollId(), ActionListener.wrap(succeeded -> listener.onResponse(Rows.empty(schema)), + listener::onFailure)); } } @@ -321,18 +322,22 @@ public class Scroller { if (!CollectionUtils.isEmpty(failure)) { onFailure(new ExecutionException(failure[0].reason(), failure[0].getCause())); } - listener.onResponse(handleResponse(response)); + handleResponse(response, listener); } catch (Exception ex) { onFailure(ex); } } - protected abstract SchemaRowSet handleResponse(SearchResponse response); + protected abstract void handleResponse(SearchResponse response, ActionListener listener); - protected final void clearScroll(String scrollId) { + protected final void clearScroll(String scrollId, ActionListener listener) { if (scrollId != null) { - // fire and forget - client.prepareClearScroll().addScrollId(scrollId).execute(); + client.prepareClearScroll().addScrollId(scrollId).execute( + ActionListener.wrap( + clearScrollResponse -> listener.onResponse(clearScrollResponse.isSucceeded()), + listener::onFailure)); + } else { + listener.onResponse(false); } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatterCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatterCursor.java index 974eed339ea..fc03f47f922 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatterCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/CliFormatterCursor.java @@ -61,6 +61,11 @@ public class CliFormatterCursor implements Cursor { delegate.nextPage(cfg, client, listener); } + @Override + public void clear(Configuration cfg, Client client, ActionListener listener) { + delegate.clear(cfg, client, listener); + } + @Override public String getWriteableName() { return NAME; diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/JdbcCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/JdbcCursor.java index f7cbadb2588..cbe62182cb4 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/JdbcCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/JdbcCursor.java @@ -70,6 +70,11 @@ public class JdbcCursor implements Cursor { delegate.nextPage(cfg, client, listener); } + @Override + public void clear(Configuration cfg, Client client, ActionListener listener) { + delegate.clear(cfg, client, listener); + } + @Override public String getWriteableName() { return NAME; diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java index e518354de00..f998f16850f 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlCliAction.java @@ -16,10 +16,12 @@ import org.elasticsearch.rest.RestController; import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse; import org.elasticsearch.xpack.sql.cli.net.protocol.Proto; import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType; +import org.elasticsearch.xpack.sql.cli.net.protocol.QueryCloseRequest; import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequest; import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitResponse; import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequest; import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageResponse; +import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryCloseResponse; import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction; import org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest; import org.elasticsearch.xpack.sql.protocol.shared.Request; @@ -64,6 +66,8 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction { return queryInit(client, (QueryInitRequest) request); case QUERY_PAGE: return queryPage(client, (QueryPageRequest) request); + case QUERY_CLOSE: + return queryClose(client, (QueryCloseRequest) request); default: throw new IllegalArgumentException("Unsupported action [" + requestType + "]"); } @@ -102,4 +106,11 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction { Cursor.encodeToString(Version.CURRENT, CliFormatterCursor.wrap(response.cursor(), formatter)), data); })); } + + private Consumer queryClose(Client client, QueryCloseRequest request) { + Cursor cursor = Cursor.decodeFromString(request.cursor); + SqlClearCursorAction.Request sqlRequest = new SqlClearCursorAction.Request(cursor); + return channel -> client.execute(SqlClearCursorAction.INSTANCE, sqlRequest, toActionListener(channel, + response -> new QueryCloseResponse(response.isSucceeded()))); + } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java index c684672b86d..f3fb969a3f6 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/RestSqlJdbcAction.java @@ -25,6 +25,8 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest; import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse; import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto; import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType; +import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryCloseRequest; +import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryCloseResponse; import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitRequest; import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitResponse; import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequest; @@ -95,6 +97,8 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { return queryInit(client, (QueryInitRequest) request); case QUERY_PAGE: return queryPage(client, (QueryPageRequest) request); + case QUERY_CLOSE: + return queryClose(client, (QueryCloseRequest) request); default: throw new IllegalArgumentException("Unsupported action [" + requestType + "]"); } @@ -165,4 +169,11 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction { Cursor.encodeToString(Version.CURRENT, JdbcCursor.wrap(response.cursor(), types)), new SqlResponsePayload(types, response.rows())))); } + + private Consumer queryClose(Client client, QueryCloseRequest request) { + Cursor cursor = Cursor.decodeFromString(request.cursor); + SqlClearCursorAction.Request sqlRequest = new SqlClearCursorAction.Request(cursor); + return channel -> client.execute(SqlClearCursorAction.INSTANCE, sqlRequest, toActionListener(channel, + response -> new QueryCloseResponse(response.isSucceeded()))); + } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlClearCursorAction.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlClearCursorAction.java new file mode 100644 index 00000000000..1abca32187f --- /dev/null +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlClearCursorAction.java @@ -0,0 +1,261 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.plugin; + +import org.elasticsearch.action.Action; +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.ActionRequestBuilder; +import org.elasticsearch.action.ActionRequestValidationException; +import org.elasticsearch.action.ActionResponse; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.client.ElasticsearchClient; +import org.elasticsearch.client.node.NodeClient; +import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; +import org.elasticsearch.common.ParseField; +import org.elasticsearch.common.inject.Inject; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.xcontent.ObjectParser; +import org.elasticsearch.common.xcontent.StatusToXContentObject; +import org.elasticsearch.common.xcontent.XContentBuilder; +import org.elasticsearch.common.xcontent.XContentParser; +import org.elasticsearch.rest.BaseRestHandler; +import org.elasticsearch.rest.RestController; +import org.elasticsearch.rest.RestRequest; +import org.elasticsearch.rest.RestStatus; +import org.elasticsearch.rest.action.RestToXContentListener; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.sql.execution.PlanExecutor; +import org.elasticsearch.xpack.sql.session.Configuration; +import org.elasticsearch.xpack.sql.session.Cursor; + +import java.io.IOException; +import java.util.Objects; + +import static org.elasticsearch.action.ValidateActions.addValidationError; +import static org.elasticsearch.rest.RestRequest.Method.POST; +import static org.elasticsearch.rest.RestStatus.NOT_FOUND; +import static org.elasticsearch.rest.RestStatus.OK; + +public class SqlClearCursorAction + extends Action { + + public static final SqlClearCursorAction INSTANCE = new SqlClearCursorAction(); + public static final String NAME = "indices:data/read/sql/close_cursor"; + + private SqlClearCursorAction() { + super(NAME); + } + + @Override + public RequestBuilder newRequestBuilder(ElasticsearchClient client) { + return new RequestBuilder(client, this); + } + + @Override + public Response newResponse() { + return new Response(); + } + + public static class Request extends ActionRequest { + + public static final ObjectParser PARSER = new ObjectParser<>(NAME, Request::new); + + public static final ParseField CURSOR = new ParseField("cursor"); + + static { + PARSER.declareString((request, nextPage) -> request.setCursor(Cursor.decodeFromString(nextPage)), CURSOR); + } + + private Cursor cursor; + + public Request() { + + } + + public Request(Cursor cursor) { + this.cursor = cursor; + } + + @Override + public ActionRequestValidationException validate() { + ActionRequestValidationException validationException = null; + if (getCursor() == null) { + validationException = addValidationError("cursor is required", validationException); + } + return validationException; + } + + public Cursor getCursor() { + return cursor; + } + + public Request setCursor(Cursor cursor) { + this.cursor = cursor; + return this; + } + + @Override + public String getDescription() { + return "SQL Clean cursor [" + getCursor() + "]"; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + cursor = in.readNamedWriteable(Cursor.class); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeNamedWriteable(cursor); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Request request = (Request) o; + return Objects.equals(cursor, request.cursor); + } + + @Override + public int hashCode() { + return Objects.hash(cursor); + } + } + + public static class RequestBuilder extends ActionRequestBuilder { + public RequestBuilder(ElasticsearchClient client, SqlClearCursorAction action, Cursor cursor) { + super(client, action, new Request(cursor)); + } + + public RequestBuilder(ElasticsearchClient client, SqlClearCursorAction action) { + super(client, action, new Request()); + } + + public RequestBuilder cursor(Cursor cursor) { + request.setCursor(cursor); + return this; + } + } + + public static class Response extends ActionResponse implements StatusToXContentObject { + + private static final ParseField SUCCEEDED = new ParseField("succeeded"); + + private boolean succeeded; + + public Response(boolean succeeded) { + this.succeeded = succeeded; + } + + Response() { + } + + /** + * @return Whether the attempt to clear a cursor was successful. + */ + public boolean isSucceeded() { + return succeeded; + } + + public Response setSucceeded(boolean succeeded) { + this.succeeded = succeeded; + return this; + } + + @Override + public RestStatus status() { + return succeeded ? NOT_FOUND : OK; + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.startObject(); + builder.field(SUCCEEDED.getPreferredName(), succeeded); + builder.endObject(); + return builder; + } + + @Override + public void readFrom(StreamInput in) throws IOException { + super.readFrom(in); + succeeded = in.readBoolean(); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(succeeded); + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + Response response = (Response) o; + return succeeded == response.succeeded; + } + + @Override + public int hashCode() { + return Objects.hash(succeeded); + } + } + + + public static class TransportAction extends HandledTransportAction { + private final PlanExecutor planExecutor; + private final SqlLicenseChecker sqlLicenseChecker; + + @Inject + public TransportAction(Settings settings, ThreadPool threadPool, + TransportService transportService, ActionFilters actionFilters, + IndexNameExpressionResolver indexNameExpressionResolver, + PlanExecutor planExecutor, + SqlLicenseChecker sqlLicenseChecker) { + super(settings, NAME, threadPool, transportService, actionFilters, + indexNameExpressionResolver, Request::new); + this.planExecutor = planExecutor; + this.sqlLicenseChecker = sqlLicenseChecker; + } + + @Override + protected void doExecute(Request request, ActionListener listener) { + sqlLicenseChecker.checkIfSqlAllowed(); + Cursor cursor = request.getCursor(); + planExecutor.cleanCursor(Configuration.DEFAULT, cursor, ActionListener.wrap( + success -> listener.onResponse(new Response(success)), listener::onFailure)); + } + } + + public static class RestAction extends BaseRestHandler { + public RestAction(Settings settings, RestController controller) { + super(settings); + controller.registerHandler(POST, "/_xpack/sql/close", this); + } + + @Override + protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException { + Request sqlRequest; + try (XContentParser parser = request.contentOrSourceParamParser()) { + sqlRequest = Request.PARSER.apply(parser, null); + } + return channel -> client.executeLocally(SqlClearCursorAction.INSTANCE, sqlRequest, new RestToXContentListener<>(channel)); + } + + @Override + public String getName() { + return "sql_translate_action"; + } + } +} diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java index ddc67f6af7a..e8ef6b26b97 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/SqlPlugin.java @@ -70,7 +70,8 @@ public class SqlPlugin implements ActionPlugin { return Arrays.asList(new RestSqlAction(settings, restController), new SqlTranslateAction.RestAction(settings, restController), new RestSqlCliAction(settings, restController), - new RestSqlJdbcAction(settings, restController, sqlLicenseChecker, indexResolver)); + new RestSqlJdbcAction(settings, restController, sqlLicenseChecker, indexResolver), + new SqlClearCursorAction.RestAction(settings, restController)); } @Override @@ -80,6 +81,7 @@ public class SqlPlugin implements ActionPlugin { } return Arrays.asList(new ActionHandler<>(SqlAction.INSTANCE, TransportSqlAction.class), - new ActionHandler<>(SqlTranslateAction.INSTANCE, SqlTranslateAction.TransportAction.class)); + new ActionHandler<>(SqlTranslateAction.INSTANCE, SqlTranslateAction.TransportAction.class), + new ActionHandler<>(SqlClearCursorAction.INSTANCE, SqlClearCursorAction.TransportAction.class)); } } diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequest.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequest.java index ffc7eca0e6b..4698095fd3f 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequest.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequest.java @@ -74,7 +74,6 @@ public class SqlRequest extends AbstractSqlRequest { return this; } - @Override public void readFrom(StreamInput in) throws IOException { super.readFrom(in); diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequestBuilder.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequestBuilder.java index b417a3ddaaf..44f1bc60f7f 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequestBuilder.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/plugin/sql/action/SqlRequestBuilder.java @@ -33,13 +33,13 @@ public class SqlRequestBuilder extends ActionRequestBuilder listener); + /** + * Cleans the resources associated with the cursor + */ + void clear(Configuration cfg, Client client, ActionListener listener); + /** * The {@link NamedWriteable}s required to deserialize {@link Cursor}s. */ diff --git a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java index 9da75b3b6b0..7e640c446cf 100644 --- a/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java +++ b/sql/server/src/main/java/org/elasticsearch/xpack/sql/session/EmptyCursor.java @@ -34,6 +34,12 @@ class EmptyCursor implements Cursor { throw new IllegalArgumentException("there is no next page"); } + @Override + public void clear(Configuration cfg, Client client, ActionListener listener) { + // There is nothing to clean + listener.onResponse(false); + } + @Override public boolean equals(Object obj) { return obj == this; diff --git a/sql/server/src/test/java/org/elasticsearch/xpack/sql/execution/search/CursorTests.java b/sql/server/src/test/java/org/elasticsearch/xpack/sql/execution/search/CursorTests.java new file mode 100644 index 00000000000..41ba50599cb --- /dev/null +++ b/sql/server/src/test/java/org/elasticsearch/xpack/sql/execution/search/CursorTests.java @@ -0,0 +1,52 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.execution.search; + +import org.elasticsearch.action.ActionListener; +import org.elasticsearch.action.search.ClearScrollRequest; +import org.elasticsearch.action.support.PlainActionFuture; +import org.elasticsearch.client.Client; +import org.elasticsearch.test.ESTestCase; +import org.elasticsearch.xpack.sql.session.Configuration; +import org.elasticsearch.xpack.sql.session.Cursor; +import org.mockito.ArgumentCaptor; + +import java.util.Collections; + +import static org.elasticsearch.action.support.PlainActionFuture.newFuture; +import static org.hamcrest.Matchers.hasSize; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyZeroInteractions; + +public class CursorTests extends ESTestCase { + + public void testEmptyCursorClearCursor() { + Client clientMock = mock(Client.class); + Cursor cursor = Cursor.EMPTY; + PlainActionFuture future = newFuture(); + cursor.clear(Configuration.DEFAULT, clientMock, future); + assertFalse(future.actionGet()); + verifyZeroInteractions(clientMock); + } + + @SuppressWarnings("unchecked") + public void testScrollCursorClearCursor() { + Client clientMock = mock(Client.class); + ActionListener listenerMock = mock(ActionListener.class); + String cursorString = randomAlphaOfLength(10); + Cursor cursor = new ScrollCursor(cursorString, Collections.emptyList(), randomInt()); + + cursor.clear(Configuration.DEFAULT, clientMock, listenerMock); + + ArgumentCaptor request = ArgumentCaptor.forClass(ClearScrollRequest.class); + verify(clientMock).clearScroll(request.capture(), any(ActionListener.class)); + assertEquals(Collections.singletonList(cursorString), request.getValue().getScrollIds()); + verifyZeroInteractions(listenerMock); + } + +} diff --git a/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlClearCursorRequestTests.java b/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlClearCursorRequestTests.java new file mode 100644 index 00000000000..5193a797018 --- /dev/null +++ b/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlClearCursorRequestTests.java @@ -0,0 +1,38 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.plugin; + +import org.elasticsearch.common.io.stream.NamedWriteableRegistry; +import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.test.EqualsHashCodeTestUtils.MutateFunction; +import org.elasticsearch.xpack.sql.session.Cursor; + +import static org.elasticsearch.xpack.sql.execution.search.ScrollCursorTests.randomScrollCursor; + +public class SqlClearCursorRequestTests extends AbstractStreamableTestCase { + + @Override + protected SqlClearCursorAction.Request createTestInstance() { + return new SqlClearCursorAction.Request(randomScrollCursor()); + } + + @Override + protected SqlClearCursorAction.Request createBlankInstance() { + return new SqlClearCursorAction.Request(); + } + + @Override + @SuppressWarnings("unchecked") + protected MutateFunction getMutateFunction() { + return request -> getCopyFunction().copy(request).setCursor(randomScrollCursor()); + } + + @Override + protected NamedWriteableRegistry getNamedWriteableRegistry() { + return new NamedWriteableRegistry(Cursor.getNamedWriteables()); + } + +} diff --git a/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlClearCursorResponseTests.java b/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlClearCursorResponseTests.java new file mode 100644 index 00000000000..a72b0901426 --- /dev/null +++ b/sql/server/src/test/java/org/elasticsearch/xpack/sql/plugin/SqlClearCursorResponseTests.java @@ -0,0 +1,27 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.plugin; + +import org.elasticsearch.test.AbstractStreamableTestCase; +import org.elasticsearch.test.EqualsHashCodeTestUtils.MutateFunction; + +public class SqlClearCursorResponseTests extends AbstractStreamableTestCase { + + @Override + protected SqlClearCursorAction.Response createTestInstance() { + return new SqlClearCursorAction.Response(randomBoolean()); + } + + @Override + protected SqlClearCursorAction.Response createBlankInstance() { + return new SqlClearCursorAction.Response(); + } + + @Override + protected MutateFunction getMutateFunction() { + return response -> getCopyFunction().copy(response).setSucceeded(response.isSucceeded() == false); + } +} diff --git a/sql/shared-client/src/main/java/org/elasticsearch/xpack/sql/client/shared/JreHttpUrlConnection.java b/sql/shared-client/src/main/java/org/elasticsearch/xpack/sql/client/shared/JreHttpUrlConnection.java index 74c023f3632..3bbd6fa9db5 100644 --- a/sql/shared-client/src/main/java/org/elasticsearch/xpack/sql/client/shared/JreHttpUrlConnection.java +++ b/sql/shared-client/src/main/java/org/elasticsearch/xpack/sql/client/shared/JreHttpUrlConnection.java @@ -29,7 +29,6 @@ import java.sql.SQLInvalidAuthorizationSpecException; import java.sql.SQLRecoverableException; import java.sql.SQLSyntaxErrorException; import java.sql.SQLTimeoutException; -import java.util.Arrays; import java.util.Base64; import java.util.function.Function; import java.util.zip.GZIPInputStream; diff --git a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryCloseRequest.java b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryCloseRequest.java new file mode 100644 index 00000000000..b2e7f895373 --- /dev/null +++ b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryCloseRequest.java @@ -0,0 +1,49 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.protocol.shared; + +import java.io.IOException; +import java.util.Objects; + +public abstract class AbstractQueryCloseRequest extends Request { + public final String cursor; + + protected AbstractQueryCloseRequest(String cursor) { + if (cursor == null) { + throw new IllegalArgumentException("[cursor] must not be null"); + } + this.cursor = cursor; + } + + protected AbstractQueryCloseRequest(SqlDataInput in) throws IOException { + this.cursor = in.readUTF(); + } + + @Override + public void writeTo(SqlDataOutput out) throws IOException { + out.writeUTF(cursor); + } + + @Override + protected String toStringBody() { + return cursor; + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + AbstractQueryCloseRequest other = (AbstractQueryCloseRequest) obj; + return Objects.equals(cursor, other.cursor); + } + + @Override + public int hashCode() { + return Objects.hash(cursor); + } +} + diff --git a/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryCloseResponse.java b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryCloseResponse.java new file mode 100644 index 00000000000..479f2d8b25b --- /dev/null +++ b/sql/shared-proto/src/main/java/org/elasticsearch/xpack/sql/protocol/shared/AbstractQueryCloseResponse.java @@ -0,0 +1,57 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License; + * you may not use this file except in compliance with the Elastic License. + */ +package org.elasticsearch.xpack.sql.protocol.shared; + +import java.io.DataInput; +import java.io.IOException; +import java.util.Objects; + +/** + * Superclass for responses both for {@link AbstractQueryInitRequest} + * and {@link AbstractQueryPageRequest}. + */ +public abstract class AbstractQueryCloseResponse extends Response { + private final boolean succeeded; + + protected AbstractQueryCloseResponse(boolean succeeded) { + this.succeeded = succeeded; + } + + protected AbstractQueryCloseResponse(Request request, DataInput in) throws IOException { + succeeded = in.readBoolean(); + } + + @Override + protected void writeTo(SqlDataOutput out) throws IOException { + out.writeBoolean(succeeded); + } + + /** + * True if the cursor was really closed + */ + public boolean succeeded() { + return succeeded; + } + + @Override + protected String toStringBody() { + return Boolean.toString(succeeded); + } + + @Override + public boolean equals(Object obj) { + if (obj == null || obj.getClass() != getClass()) { + return false; + } + AbstractQueryCloseResponse other = (AbstractQueryCloseResponse) obj; + return succeeded == other.succeeded; + } + + @Override + public int hashCode() { + return Objects.hash(succeeded); + } +}