SQL: Add ability to close cursors (elastic/x-pack-elasticsearch#3249)

This commits adds a new end point for closing in-flight cursors, it also ensures that all cursors are properly closed by adding after test checks that ensures that we don't leave any search context open.

relates elastic/x-pack-elasticsearch#2878

Original commit: elastic/x-pack-elasticsearch@1052ea28dc
This commit is contained in:
Igor Motov 2017-12-11 11:36:02 -05:00 committed by GitHub
parent fab3712e3d
commit 4bebc307c3
51 changed files with 1231 additions and 90 deletions

View File

@ -1,4 +1,5 @@
include::sql-rest.asciidoc[]
include::sql-jdbc.asciidoc[]
include::sql-translate.asciidoc[]
include::sql-cli.asciidoc[]
include::sql-jdbc.asciidoc[]

View File

@ -114,8 +114,32 @@ You've reached the last page when there is no `cursor` returned
in the results. Like Elasticsearch's <<search-request-scroll,scroll>>,
SQL may keep state in Elasticsearch to support the cursor. Unlike
scroll, receiving the last page is enough to guarantee that the
Elasticsearch state is cleared. For now, that is the only way to
clear the state.
Elasticsearch state is cleared.
To clear the state earlier, you can use the clear cursor command:
[source,js]
--------------------------------------------------
POST /_xpack/sql/close
{
"cursor": "sDXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAAEWYUpOYklQMHhRUEtld3RsNnFtYU1hQQ==:BAFmBGRhdGUBZgVsaWtlcwFzB21lc3NhZ2UBZgR1c2Vy9f///w8="
}
--------------------------------------------------
// CONSOLE
// TEST[continued]
// TEST[s/sDXF1ZXJ5QW5kRmV0Y2gBAAAAAAAAAAEWYUpOYklQMHhRUEtld3RsNnFtYU1hQQ==:BAFmBGRhdGUBZgVsaWtlcwFzB21lc3NhZ2UBZgR1c2Vy9f\/\/\/w8=/$body.cursor/]
Which will like return the
[source,js]
--------------------------------------------------
{
"succeeded" : true
}
--------------------------------------------------
// TESTRESPONSE
[[sql-rest-filtering]]

View File

@ -64,6 +64,7 @@ import org.elasticsearch.xpack.security.user.SystemUser;
import org.elasticsearch.xpack.security.user.User;
import org.elasticsearch.xpack.security.user.XPackSecurityUser;
import org.elasticsearch.xpack.security.user.XPackUser;
import org.elasticsearch.xpack.sql.plugin.SqlClearCursorAction;
import org.elasticsearch.xpack.sql.plugin.SqlTranslateAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
@ -501,6 +502,7 @@ public class AuthorizationService extends AbstractComponent {
action.equals(SearchTransportService.QUERY_SCROLL_ACTION_NAME) ||
action.equals(SearchTransportService.FREE_CONTEXT_SCROLL_ACTION_NAME) ||
action.equals(ClearScrollAction.NAME) ||
action.equals(SqlClearCursorAction.NAME) ||
action.equals(SearchTransportService.CLEAR_SCROLL_CONTEXTS_ACTION_NAME);
}

View File

@ -0,0 +1,92 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.support.WriteRequest;
import org.elasticsearch.xpack.sql.plugin.SqlClearCursorAction;
import org.elasticsearch.xpack.sql.plugin.SqlClearCursorAction.Response;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlResponse;
import org.elasticsearch.xpack.sql.session.Cursor;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
public class SqlClearCursorActionIT extends AbstractSqlIntegTestCase {
public void testSqlClearCursorAction() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test").get());
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
int indexSize = randomIntBetween(100, 300);
logger.info("Indexing {} records", indexSize);
for (int i = 0; i < indexSize; i++) {
bulkRequestBuilder.add(new IndexRequest("test", "doc", "id" + i).source("data", "bar", "count", i));
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
ensureYellow("test");
assertEquals(0, getNumberOfSearchContexts());
int fetchSize = randomIntBetween(5, 20);
logger.info("Fetching {} records at a time", fetchSize);
SqlResponse sqlResponse = client().prepareExecute(SqlAction.INSTANCE).query("SELECT * FROM test").fetchSize(fetchSize).get();
assertEquals(fetchSize, sqlResponse.size());
assertThat(getNumberOfSearchContexts(), greaterThan(0L));
assertThat(sqlResponse.cursor(), notNullValue());
assertThat(sqlResponse.cursor(), not(equalTo(Cursor.EMPTY)));
Response cleanCursorResponse = client().prepareExecute(SqlClearCursorAction.INSTANCE).cursor(sqlResponse.cursor()).get();
assertTrue(cleanCursorResponse.isSucceeded());
assertEquals(0, getNumberOfSearchContexts());
}
public void testAutoCursorCleanup() throws Exception {
assertAcked(client().admin().indices().prepareCreate("test").get());
BulkRequestBuilder bulkRequestBuilder = client().prepareBulk();
int indexSize = randomIntBetween(100, 300);
logger.info("Indexing {} records", indexSize);
for (int i = 0; i < indexSize; i++) {
bulkRequestBuilder.add(new IndexRequest("test", "doc", "id" + i).source("data", "bar", "count", i));
}
bulkRequestBuilder.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE).get();
ensureYellow("test");
assertEquals(0, getNumberOfSearchContexts());
int fetchSize = randomIntBetween(5, 20);
logger.info("Fetching {} records at a time", fetchSize);
SqlResponse sqlResponse = client().prepareExecute(SqlAction.INSTANCE).query("SELECT * FROM test").fetchSize(fetchSize).get();
assertEquals(fetchSize, sqlResponse.size());
assertThat(getNumberOfSearchContexts(), greaterThan(0L));
assertThat(sqlResponse.cursor(), notNullValue());
assertThat(sqlResponse.cursor(), not(equalTo(Cursor.EMPTY)));
long fetched = sqlResponse.size();
do {
sqlResponse = client().prepareExecute(SqlAction.INSTANCE).cursor(sqlResponse.cursor()).get();
fetched += sqlResponse.size();
} while (sqlResponse.cursor().equals(Cursor.EMPTY) == false);
assertEquals(indexSize, fetched);
Response cleanCursorResponse = client().prepareExecute(SqlClearCursorAction.INSTANCE).cursor(sqlResponse.cursor()).get();
assertFalse(cleanCursorResponse.isSucceeded());
assertEquals(0, getNumberOfSearchContexts());
}
private long getNumberOfSearchContexts() {
return client().admin().indices().prepareStats("test").clear().setSearch(true).get()
.getIndex("test").getTotal().getSearch().getOpenContexts();
}
}

View File

@ -0,0 +1,15 @@
{
"xpack.sql.clear_cursor": {
"documentation": "Clear SQL cursor",
"methods": [ "POST"],
"url": {
"path": "/_xpack/sql/close",
"paths": [ "/_xpack/sql/close" ],
"parts": {}
},
"body": {
"description" : "Specify the cursor value in the `cursor` element to clean the cursor.",
"required" : true
}
}
}

View File

@ -1,5 +1,5 @@
{
"xpack.sql": {
"xpack.sql.query": {
"documentation": "Execute SQL",
"methods": [ "POST", "GET" ],
"url": {
@ -18,4 +18,4 @@
"required" : true
}
}
}
}

View File

@ -1,5 +1,5 @@
---
"Execute some SQL":
setup:
- do:
bulk:
refresh: true
@ -16,9 +16,17 @@
_id: 2
- str: test2
int: 2
- index:
_index: test
_type: doc
_id: 3
- str: test3
int: 3
---
"Execute some SQL":
- do:
xpack.sql:
xpack.sql.query:
format: json
body:
query: "SELECT * FROM test ORDER BY int asc"
@ -28,9 +36,50 @@
- match: { rows.0.1: test1 }
- match: { rows.1.0: 2 }
- match: { rows.1.1: test2 }
- match: { rows.2.0: 3 }
- match: { rows.2.1: test3 }
---
"Paging through results":
- do:
xpack.sql.query:
format: json
body:
query: "SELECT * FROM test ORDER BY int asc"
fetch_size: 2
- match: { columns.0.name: int }
- match: { columns.1.name: str }
- match: { rows.0.0: 1 }
- match: { rows.0.1: test1 }
- match: { rows.1.0: 2 }
- match: { rows.1.1: test2 }
- is_true: cursor
- set: { cursor: cursor }
- do:
xpack.sql:
xpack.sql.query:
format: json
body:
cursor: "$cursor"
- match: { rows.0.0: 3 }
- match: { rows.0.1: test3 }
- is_false: columns
- is_true: cursor
- set: { cursor: cursor }
- do:
xpack.sql.query:
format: json
body:
cursor: "$cursor"
- is_false: columns
- is_false: cursor
- length: { rows: 0 }
---
"Getting textual representation":
- do:
xpack.sql.query:
format: text
body:
query: "SELECT * FROM test ORDER BY int asc"
@ -40,4 +89,32 @@
---------------\+---------------\n
1 \s+ \|test1 \s+ \n
2 \s+ \|test2 \s+ \n
3 \s+ \|test3 \s+ \n
$/
---
"Clean cursor":
- do:
xpack.sql.query:
format: json
body:
query: "SELECT * FROM test ORDER BY int asc"
fetch_size: 2
- match: { columns.0.name: int }
- match: { columns.1.name: str }
- match: { rows.0.0: 1 }
- match: { rows.0.1: test1 }
- is_true: cursor
- set: { cursor: cursor}
- do:
xpack.sql.clear_cursor:
body:
cursor: "$cursor"
- match: { "succeeded": true }
- do:
indices.stats: { index: 'test' }
- match: { indices.test.total.search.open_contexts: 0 }

View File

@ -18,6 +18,7 @@ import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.elasticsearch.xpack.qa.sql.embed.CliHttpServer;
import org.elasticsearch.xpack.qa.sql.rest.RestSqlTestCase;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
@ -29,6 +30,7 @@ import java.security.AccessControlException;
import java.util.function.Supplier;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.xpack.qa.sql.rest.RestSqlTestCase.assertNoSearchContexts;
public abstract class CliIntegrationTestCase extends ESRestTestCase {
/**
@ -67,12 +69,13 @@ public abstract class CliIntegrationTestCase extends ESRestTestCase {
}
@After
public void orderlyShutdown() throws IOException, InterruptedException {
public void orderlyShutdown() throws Exception {
if (cli == null) {
// failed to connect to the cli so there is nothing to do here
return;
}
cli.close();
assertNoSearchContexts();
}
/**

View File

@ -7,6 +7,7 @@ package org.elasticsearch.xpack.qa.sql.jdbc;
import org.apache.http.entity.ContentType;
import org.apache.http.entity.StringEntity;
import org.elasticsearch.xpack.qa.sql.rest.RestSqlTestCase;
import org.junit.Before;
import java.io.IOException;
@ -16,6 +17,7 @@ import java.sql.SQLException;
import java.sql.Statement;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.xpack.qa.sql.rest.RestSqlTestCase.assertNoSearchContexts;
/**
* Tests for setting {@link Statement#setFetchSize(int)} and
@ -52,6 +54,27 @@ public class FetchSizeTestCase extends JdbcIntegrationTestCase {
}
}
/**
* Test for {@code SELECT} that is implemented as a scroll query.
* In this test we don't retrieve all records and rely on close() to clean the cursor
*/
public void testIncompleteScroll() throws Exception {
try (Connection c = esJdbc();
Statement s = c.createStatement()) {
s.setFetchSize(4);
try (ResultSet rs = s.executeQuery("SELECT * FROM test ORDER BY test_field ASC")) {
for (int i = 0; i < 10; i++) {
assertEquals(4, rs.getFetchSize());
assertTrue(rs.next());
assertEquals(i, rs.getInt(1));
}
assertTrue(rs.next());
}
}
assertNoSearchContexts();
}
/**
* Test for {@code SELECT} that is implemented as an aggregation.
* In this case the fetch size should be entirely ignored.

View File

@ -16,9 +16,12 @@ import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.json.JsonXContent;
import org.elasticsearch.test.rest.ESRestTestCase;
import org.elasticsearch.xpack.qa.sql.embed.EmbeddedJdbcServer;
import org.elasticsearch.xpack.qa.sql.rest.RestSqlTestCase;
import org.elasticsearch.xpack.sql.jdbc.jdbc.JdbcConfiguration;
import org.elasticsearch.xpack.sql.jdbc.jdbcx.JdbcDataSource;
import org.joda.time.DateTimeZone;
import org.junit.After;
import org.junit.Before;
import org.junit.ClassRule;
import java.io.IOException;
@ -35,6 +38,7 @@ import java.util.Set;
import java.util.TimeZone;
import static java.util.Collections.singletonMap;
import static org.elasticsearch.xpack.qa.sql.rest.RestSqlTestCase.assertNoSearchContexts;
public abstract class JdbcIntegrationTestCase extends ESRestTestCase {
/**
@ -52,6 +56,12 @@ public abstract class JdbcIntegrationTestCase extends ESRestTestCase {
@ClassRule
public static final EmbeddedJdbcServer EMBEDDED_SERVER = EMBED_SQL ? new EmbeddedJdbcServer() : null;
@After
public void checkSearchContent() throws Exception {
// Some context might linger due to fire and forget nature of scroll cleanup
assertNoSearchContexts();
}
/**
* Read an address for Elasticsearch suitable for the JDBC driver from the system properties.
*/

View File

@ -275,9 +275,9 @@ public abstract class RestSqlTestCase extends ESRestTestCase implements ErrorsTe
new StringEntity(bulk.toString(), ContentType.APPLICATION_JSON));
String expected =
"test \n" +
"---------------\n" +
"test \n" +
"test \n";
"---------------\n" +
"test \n" +
"test \n";
Tuple<String, String> response = runSqlAsText("SELECT * FROM test");
logger.warn(expected);
logger.warn(response.v1());
@ -318,6 +318,11 @@ public abstract class RestSqlTestCase extends ESRestTestCase implements ErrorsTe
expected.put("size", 0);
expected.put("rows", emptyList());
assertResponse(expected, runSql(new StringEntity("{\"cursor\":\"" + cursor + "\"}", ContentType.APPLICATION_JSON)));
Map<String, Object> response = runSql("/close", new StringEntity("{\"cursor\":\"" + cursor + "\"}", ContentType.APPLICATION_JSON));
assertEquals(true, response.get("succeeded"));
assertEquals(0, getNumberOfSearchContexts("test"));
}
private Tuple<String, String> runSqlAsText(String sql) throws IOException {
@ -325,7 +330,7 @@ public abstract class RestSqlTestCase extends ESRestTestCase implements ErrorsTe
}
private Tuple<String, String> runSqlAsText(String suffix, HttpEntity sql) throws IOException {
Response response = client().performRequest("POST", "/_xpack/sql" + suffix, singletonMap("error_trace", "true"), sql);
Response response = client().performRequest("POST", "/_xpack/sql" + suffix, singletonMap("error_trace", "true"), sql);
return new Tuple<>(
Streams.copyToString(new InputStreamReader(response.getEntity().getContent(), StandardCharsets.UTF_8)),
response.getHeader("Cursor")
@ -340,4 +345,34 @@ public abstract class RestSqlTestCase extends ESRestTestCase implements ErrorsTe
}
}
public static int getNumberOfSearchContexts(String index) throws IOException {
Response response = client().performRequest("GET", "/_stats/search");
Map<String, Object> stats;
try (InputStream content = response.getEntity().getContent()) {
stats = XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
}
return getOpenContexts(stats, index);
}
public static void assertNoSearchContexts() throws IOException {
Response response = client().performRequest("GET", "/_stats/search");
Map<String, Object> stats;
try (InputStream content = response.getEntity().getContent()) {
stats = XContentHelper.convertToMap(JsonXContent.jsonXContent, content, false);
}
@SuppressWarnings("unchecked")
Map<String, Object> indexStats = (Map<String, Object>) stats.get("indices");
for (String index : indexStats.keySet()) {
if (index.startsWith(".") == false) { // We are not interested in internal indices
assertEquals(index + " should have no search contexts", 0, getOpenContexts(stats, index));
}
}
}
@SuppressWarnings("unchecked")
public static int getOpenContexts(Map<String, Object> indexStats, String index) {
return (int) ((Map<String, Object>) ((Map<String, Object>) ((Map<String, Object>) ((Map<String, Object>)
indexStats.get("indices")).get(index)).get("total")).get("search")).get("open_contexts");
}
}

View File

@ -33,7 +33,8 @@ public final class Proto extends AbstractProto {
public enum RequestType implements AbstractProto.RequestType {
INFO(InfoRequest::new),
QUERY_INIT(QueryInitRequest::new),
QUERY_PAGE(QueryPageRequest::new);
QUERY_PAGE(QueryPageRequest::new),
QUERY_CLOSE(QueryCloseRequest::new);
private final RequestReader reader;
@ -64,7 +65,8 @@ public final class Proto extends AbstractProto {
public enum ResponseType implements AbstractProto.ResponseType {
INFO(InfoResponse::new),
QUERY_INIT(QueryInitResponse::new),
QUERY_PAGE(QueryPageResponse::new);
QUERY_PAGE(QueryPageResponse::new),
QUERY_CLOSE(QueryCloseResponse::new);
private final ResponseReader reader;

View File

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryCloseRequest;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import java.io.IOException;
public class QueryCloseRequest extends AbstractQueryCloseRequest {
public QueryCloseRequest(String cursor) {
super(cursor);
}
QueryCloseRequest(SqlDataInput in) throws IOException {
super(in);
}
@Override
public Proto.RequestType requestType() {
return Proto.RequestType.QUERY_CLOSE;
}
}

View File

@ -0,0 +1,34 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryCloseResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.DataInput;
import java.io.IOException;
public class QueryCloseResponse extends AbstractQueryCloseResponse {
public QueryCloseResponse(boolean succeeded) {
super(succeeded);
}
QueryCloseResponse(Request request, DataInput in) throws IOException {
super(request, in);
}
@Override
public RequestType requestType() {
return RequestType.QUERY_CLOSE;
}
@Override
public ResponseType responseType() {
return ResponseType.QUERY_CLOSE;
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
public class QueryCloseRequestTests extends ESTestCase {
static QueryCloseRequest randomQueryCloseRequest() {
String cursor = randomAlphaOfLength(10);
return new QueryCloseRequest(cursor);
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomQueryCloseRequest());
}
public void testToString() {
assertEquals("QueryCloseRequest<0320>", new QueryCloseRequest("0320").toString());
}
}

View File

@ -0,0 +1,28 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.cli.net.protocol;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.cli.net.protocol.CliRoundTripTestUtils.assertRoundTripCurrentVersion;
import static org.elasticsearch.xpack.sql.cli.net.protocol.QueryCloseRequestTests.randomQueryCloseRequest;
public class QueryCloseResponseTests extends ESTestCase {
static QueryCloseResponse randomQueryCloseResponse() {
return new QueryCloseResponse(randomBoolean());
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomQueryCloseRequest(), randomQueryCloseResponse());
}
public void testToString() {
assertEquals("QueryCloseResponse<true>",
new QueryCloseResponse(true).toString());
}
}

View File

@ -8,6 +8,8 @@ package org.elasticsearch.xpack.sql.cli;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryCloseRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryCloseResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryResponse;
@ -22,6 +24,8 @@ import java.security.AccessController;
import java.security.PrivilegedAction;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Collections;
import java.util.Map;
import java.util.TimeZone;
public class CliHttpClient {
@ -47,6 +51,11 @@ public class CliHttpClient {
return (QueryResponse) post(request);
}
public QueryCloseResponse queryClose(String cursor) throws SQLException {
QueryCloseRequest request = new QueryCloseRequest(cursor);
return (QueryCloseResponse) post(request);
}
private TimeoutInfo timeout() {
long clientTime = Instant.now().toEpochMilli();
return new TimeoutInfo(clientTime, cfg.queryTimeout(), cfg.pageTimeout());

View File

@ -5,6 +5,7 @@
*/
package org.elasticsearch.xpack.sql.cli.command;
import org.elasticsearch.xpack.sql.cli.CliHttpClient;
import org.elasticsearch.xpack.sql.cli.CliTerminal;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryResponse;
import org.elasticsearch.xpack.sql.client.shared.JreHttpUrlConnection;
@ -20,42 +21,41 @@ public class ServerQueryCliCommand extends AbstractServerCliCommand {
@Override
protected boolean doHandle(CliTerminal terminal, CliSession cliSession, String line) {
QueryResponse response;
QueryResponse response = null;
CliHttpClient cliClient = cliSession.getClient();
try {
response = cliSession.getClient().queryInit(line, cliSession.getFetchSize());
response = cliClient.queryInit(line, cliSession.getFetchSize());
if (response.data.startsWith("digraph ")) {
handleGraphviz(terminal, response.data);
return true;
}
while (true) {
handleText(terminal, response.data);
if (response.cursor().isEmpty()) {
// Successfully finished the entire query!
terminal.flush();
return true;
}
if (false == cliSession.getFetchSeparator().equals("")) {
terminal.println(cliSession.getFetchSeparator());
}
response = cliSession.getClient().nextPage(response.cursor());
}
} catch (SQLException e) {
if (JreHttpUrlConnection.SQL_STATE_BAD_SERVER.equals(e.getSQLState())) {
terminal.error("Server error", e.getMessage());
} else {
terminal.error("Bad request", e.getMessage());
}
return true;
}
if (response.data.startsWith("digraph ")) {
handleGraphviz(terminal, response.data);
return true;
}
while (true) {
handleText(terminal, response.data);
if (response.cursor().isEmpty()) {
// Successfully finished the entire query!
terminal.flush();
return true;
}
if (false == cliSession.getFetchSeparator().equals("")) {
terminal.println(cliSession.getFetchSeparator());
}
try {
response = cliSession.getClient().nextPage(response.cursor());
} catch (SQLException e) {
if (JreHttpUrlConnection.SQL_STATE_BAD_SERVER.equals(e.getSQLState())) {
terminal.error("Server error", e.getMessage());
} else {
terminal.error("Bad request", e.getMessage());
if (response != null && response.cursor().isEmpty() == false) {
try {
cliClient.queryClose(response.cursor());
} catch (SQLException ex) {
terminal.error("Could not close cursor", ex.getMessage());
}
return true;
}
}
return true;
}
private void handleText(CliTerminal terminal, String str) {

View File

@ -8,6 +8,7 @@ package org.elasticsearch.xpack.sql.cli.command;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.cli.CliHttpClient;
import org.elasticsearch.xpack.sql.cli.TestTerminal;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryCloseResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageResponse;
@ -81,4 +82,21 @@ public class ServerQueryCliCommandTests extends ESTestCase {
verifyNoMoreInteractions(client);
}
public void testCursorCleanupOnError() throws Exception {
TestTerminal testTerminal = new TestTerminal();
CliHttpClient client = mock(CliHttpClient.class);
CliSession cliSession = new CliSession(client);
cliSession.setFetchSize(15);
when(client.queryInit("test query", 15)).thenReturn(new QueryInitResponse(123, "my_cursor1", "first"));
when(client.nextPage("my_cursor1")).thenThrow(new SQLException("test exception"));
when(client.queryClose("my_cursor1")).thenReturn(new QueryCloseResponse(true));
ServerQueryCliCommand cliCommand = new ServerQueryCliCommand();
assertTrue(cliCommand.handle(testTerminal, cliSession, "test query"));
assertEquals("first<b>Bad request [</b><i>test exception</i><b>]</b>\n", testTerminal.toString());
verify(client, times(1)).queryInit(eq("test query"), eq(15));
verify(client, times(1)).nextPage(any());
verify(client, times(1)).queryClose(eq("my_cursor1"));
verifyNoMoreInteractions(client);
}
}

View File

@ -36,7 +36,7 @@ public final class Proto extends AbstractProto {
META_COLUMN(MetaColumnRequest::new),
QUERY_INIT(QueryInitRequest::new),
QUERY_PAGE(QueryPageRequest::new),
// QUERY_CLOSE(QueryClosenRequest::new), TODO implement me
QUERY_CLOSE(QueryCloseRequest::new)
;
private final RequestReader reader;
@ -71,7 +71,7 @@ public final class Proto extends AbstractProto {
META_COLUMN(MetaColumnResponse::new),
QUERY_INIT(QueryInitResponse::new),
QUERY_PAGE(QueryPageResponse::new),
// QUERY_CLOSE(QueryClosenResponse::new) TODO implement me
QUERY_CLOSE(QueryCloseResponse::new)
;
private final ResponseReader reader;

View File

@ -0,0 +1,32 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryCloseRequest;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryPageRequest;
import org.elasticsearch.xpack.sql.protocol.shared.Nullable;
import org.elasticsearch.xpack.sql.protocol.shared.SqlDataInput;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.io.IOException;
public class QueryCloseRequest extends AbstractQueryCloseRequest {
public QueryCloseRequest(String cursor) {
super(cursor);
}
QueryCloseRequest(SqlDataInput in) throws IOException {
super(in);
}
@Override
public RequestType requestType() {
return RequestType.QUERY_CLOSE;
}
}

View File

@ -0,0 +1,35 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.ResponseType;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractInfoResponse;
import org.elasticsearch.xpack.sql.protocol.shared.AbstractQueryCloseResponse;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
import java.io.DataInput;
import java.io.IOException;
public class QueryCloseResponse extends AbstractQueryCloseResponse {
public QueryCloseResponse(boolean succeeded) {
super(succeeded);
}
QueryCloseResponse(Request request, DataInput in) throws IOException {
super(request, in);
}
@Override
public RequestType requestType() {
return RequestType.QUERY_CLOSE;
}
@Override
public ResponseType responseType() {
return ResponseType.QUERY_CLOSE;
}
}

View File

@ -0,0 +1,29 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.protocol.shared.TimeoutInfo;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.assertRoundTripCurrentVersion;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.randomTimeoutInfo;
public class QueryCloseRequestTests extends ESTestCase {
static QueryCloseRequest randomQueryCloseRequest() {
String cursor = randomAlphaOfLength(10);
return new QueryCloseRequest(cursor);
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(randomQueryCloseRequest());
}
public void testToString() {
assertEquals("QueryCloseRequest<123>", new QueryCloseRequest("123").toString());
}
}

View File

@ -0,0 +1,26 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.jdbc.net.protocol;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import static org.elasticsearch.xpack.sql.jdbc.net.protocol.JdbcRoundTripTestUtils.assertRoundTripCurrentVersion;
public class QueryCloseResponseTests extends ESTestCase {
static QueryCloseResponse randomQueryCloseResponse() {
return new QueryCloseResponse(randomBoolean());
}
public void testRoundTrip() throws IOException {
assertRoundTripCurrentVersion(QueryCloseRequestTests::randomQueryCloseRequest, randomQueryCloseResponse());
}
public void testToString() {
assertEquals("QueryCloseResponse<true>", new QueryCloseResponse(true).toString());
}
}

View File

@ -1257,5 +1257,10 @@ class JdbcDatabaseMetaData implements DatabaseMetaData, JdbcWrapper {
public int batchSize() {
return data.length;
}
@Override
public void close() throws SQLException {
// this cursor doesn't hold any resource - no need to clean up
}
}
}

View File

@ -109,12 +109,13 @@ class JdbcResultSet implements ResultSet, JdbcWrapper {
}
@Override
public void close() {
public void close() throws SQLException {
if (!closed) {
closed = true;
if (statement != null) {
statement.resultSetWasClosed();
}
cursor.close();
}
}

View File

@ -48,7 +48,7 @@ class JdbcStatement implements Statement, JdbcWrapper {
}
@Override
public void close() {
public void close() throws SQLException {
if (!closed) {
closed = true;
closeResultSet();
@ -382,7 +382,7 @@ class JdbcStatement implements Statement, JdbcWrapper {
}
}
protected final void closeResultSet() {
protected final void closeResultSet() throws SQLException {
if (rs != null) {
ignoreResultSetClose = true;
try {
@ -394,7 +394,7 @@ class JdbcStatement implements Statement, JdbcWrapper {
}
}
final void resultSetWasClosed() {
final void resultSetWasClosed() throws SQLException {
if (closeOnCompletion && !ignoreResultSetClose) {
close();
}

View File

@ -27,4 +27,6 @@ public interface Cursor {
* server in the current batch.
*/
int batchSize();
void close() throws SQLException;
}

View File

@ -57,4 +57,11 @@ class DefaultCursor implements Cursor {
public int batchSize() {
return page.rows();
}
@Override
public void close() throws SQLException {
if (cursor.isEmpty() == false) {
client.queryClose(cursor);
}
}
}

View File

@ -8,7 +8,6 @@ package org.elasticsearch.xpack.sql.jdbc.net.client;
import org.elasticsearch.xpack.sql.client.shared.ClientException;
import org.elasticsearch.xpack.sql.client.shared.JreHttpUrlConnection;
import org.elasticsearch.xpack.sql.client.shared.JreHttpUrlConnection.ResponseOrException;
import org.elasticsearch.xpack.sql.jdbc.JdbcException;
import org.elasticsearch.xpack.sql.jdbc.JdbcSQLException;
import org.elasticsearch.xpack.sql.jdbc.jdbc.JdbcConfiguration;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
@ -60,4 +59,5 @@ class HttpClient {
throw new JdbcSQLException(ex, "Transport failure");
}
}
}

View File

@ -14,6 +14,8 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaColumnResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Page;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryCloseRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryCloseResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequest;
@ -24,7 +26,9 @@ import java.io.DataInput;
import java.io.IOException;
import java.sql.SQLException;
import java.time.Instant;
import java.util.Collections;
import java.util.List;
import java.util.Map;
public class JdbcHttpClient {
@FunctionalInterface
@ -69,6 +73,11 @@ public class JdbcHttpClient {
return ((QueryPageResponse) http.post(request)).cursor();
}
public boolean queryClose(String cursor) throws SQLException {
QueryCloseRequest request = new QueryCloseRequest(cursor);
return ((QueryCloseResponse) http.post(request)).succeeded();
}
public InfoResponse serverInfo() throws SQLException {
if (serverInfo == null) {
serverInfo = fetchServerInfo();

View File

@ -71,4 +71,8 @@ public class PlanExecutor {
public void nextPage(Configuration cfg, Cursor cursor, ActionListener<RowSet> listener) {
cursor.nextPage(cfg, client, listener);
}
public void cleanCursor(Configuration cfg, Cursor cursor, ActionListener<Boolean> listener) {
cursor.clear(cfg, client, listener);
}
}

View File

@ -22,6 +22,7 @@ import java.util.Set;
*/
abstract class AbstractSearchHitRowSet extends AbstractRowSet {
private final SearchHit[] hits;
private final Cursor cursor;
private final String scrollId;
private final List<HitExtractor> extractors;
private final Set<String> innerHits = new LinkedHashSet<>();
@ -75,6 +76,21 @@ abstract class AbstractSearchHitRowSet extends AbstractRowSet {
size = limitHits < 0 ? sz : Math.min(sz, limitHits);
indexPerLevel = new int[maxDepth + 1];
this.innerHit = innerHit;
if (scrollId == null) {
/* SearchResponse can contain a null scroll when you start a
* scroll but all results fit in the first page. */
cursor = Cursor.EMPTY;
} else {
// compute remaining limit
int remainingLimit = limit - size;
// if the computed limit is zero, or the size is zero it means either there's nothing left or the limit has been reached
if (size == 0 || remainingLimit == 0) {
cursor = Cursor.EMPTY;
} else {
cursor = new ScrollCursor(scrollId, extractors, remainingLimit);
}
}
}
@Override
@ -152,17 +168,6 @@ abstract class AbstractSearchHitRowSet extends AbstractRowSet {
@Override
public Cursor nextPageCursor() {
if (scrollId == null) {
/* SearchResponse can contain a null scroll when you start a
* scroll but all results fit in the first page. */
return Cursor.EMPTY;
}
// compute remaining limit
int remainingLimit = limit - size;
// if the computed limit is zero, or the size is zero it means either there's nothing left or the limit has been reached
if (size == 0 || remainingLimit == 0) {
return Cursor.EMPTY;
}
return new ScrollCursor(scrollId, extractors, remainingLimit);
return cursor;
}
}

View File

@ -7,6 +7,8 @@ package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequest;
import org.elasticsearch.client.Client;
@ -57,12 +59,25 @@ public class ScrollCursor implements Cursor {
public void nextPage(Configuration cfg, Client client, ActionListener<RowSet> listener) {
SearchScrollRequest request = new SearchScrollRequest(scrollId).scroll(cfg.pageTimeout());
client.searchScroll(request, ActionListener.wrap((SearchResponse response) -> {
int limitHits = limit;
listener.onResponse(new ScrolledSearchHitRowSet(extractors, response.getHits().getHits(),
limitHits, response.getScrollId()));
ScrolledSearchHitRowSet rowSet = new ScrolledSearchHitRowSet(extractors, response.getHits().getHits(),
limit, response.getScrollId());
if (rowSet.nextPageCursor() == Cursor.EMPTY ) {
// we are finished with this cursor, let's clean it before continuing
clear(cfg, client, ActionListener.wrap(success -> listener.onResponse(rowSet), listener::onFailure));
} else {
listener.onResponse(rowSet);
}
}, listener::onFailure));
}
@Override
public void clear(Configuration cfg, Client client, ActionListener<Boolean> listener) {
cleanCursor(client, scrollId,
ActionListener.wrap(
clearScrollResponse -> listener.onResponse(clearScrollResponse.isSucceeded()),
listener::onFailure));
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
@ -83,4 +98,11 @@ public class ScrollCursor implements Cursor {
public String toString() {
return "cursor for scroll [" + scrollId + "]";
}
public static void cleanCursor(Client client, String scrollId, ActionListener<ClearScrollResponse> listener) {
ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId);
client.clearScroll(clearScrollRequest, listener);
}
}

View File

@ -110,7 +110,7 @@ public class Scroller {
}
@Override
protected SchemaRowSet handleResponse(SearchResponse response) {
protected void handleResponse(SearchResponse response, ActionListener<SchemaRowSet> listener) {
final List<Object[]> extractedAggs = new ArrayList<>();
AggValues aggValues = new AggValues(extractedAggs);
@ -156,9 +156,9 @@ public class Scroller {
}
aggValues.init(maxDepth, query.limit());
clearScroll(response.getScrollId());
return new AggsRowSet(schema, aggValues, aggColumns);
clearScroll(response.getScrollId(), ActionListener.wrap(
succeeded -> listener.onResponse(new AggsRowSet(schema, aggValues, aggColumns)),
listener::onFailure));
}
private Object[] extractAggValue(ColumnReference col, SearchResponse response) {
@ -230,7 +230,8 @@ public class Scroller {
super.onResponse(response);
}
protected SchemaRowSet handleResponse(SearchResponse response) {
@Override
protected void handleResponse(SearchResponse response, ActionListener<SchemaRowSet> listener) {
SearchHit[] hits = response.getHits().getHits();
List<HitExtractor> exts = getExtractors();
@ -239,23 +240,23 @@ public class Scroller {
String scrollId = response.getScrollId();
// if there's an id, try to setup next scroll
if (scrollId != null) {
if (scrollId != null &&
// is all the content already retrieved?
if (Boolean.TRUE.equals(response.isTerminatedEarly()) || response.getHits().getTotalHits() == hits.length
(Boolean.TRUE.equals(response.isTerminatedEarly()) || response.getHits().getTotalHits() == hits.length
// or maybe the limit has been reached
|| (hits.length >= query.limit() && query.limit() > -1)) {
|| (hits.length >= query.limit() && query.limit() > -1))) {
// if so, clear the scroll
clearScroll(scrollId);
// and remove it to indicate no more data is expected
scrollId = null;
}
clearScroll(response.getScrollId(), ActionListener.wrap(
succeeded -> listener.onResponse(new InitialSearchHitRowSet(schema, exts, hits, query.limit(), null)),
listener::onFailure));
} else {
listener.onResponse(new InitialSearchHitRowSet(schema, exts, hits, query.limit(), scrollId));
}
return new InitialSearchHitRowSet(schema, exts, hits, query.limit(), scrollId);
}
// no hits
else {
clearScroll(response.getScrollId());
return Rows.empty(schema);
clearScroll(response.getScrollId(), ActionListener.wrap(succeeded -> listener.onResponse(Rows.empty(schema)),
listener::onFailure));
}
}
@ -321,18 +322,22 @@ public class Scroller {
if (!CollectionUtils.isEmpty(failure)) {
onFailure(new ExecutionException(failure[0].reason(), failure[0].getCause()));
}
listener.onResponse(handleResponse(response));
handleResponse(response, listener);
} catch (Exception ex) {
onFailure(ex);
}
}
protected abstract SchemaRowSet handleResponse(SearchResponse response);
protected abstract void handleResponse(SearchResponse response, ActionListener<SchemaRowSet> listener);
protected final void clearScroll(String scrollId) {
protected final void clearScroll(String scrollId, ActionListener<Boolean> listener) {
if (scrollId != null) {
// fire and forget
client.prepareClearScroll().addScrollId(scrollId).execute();
client.prepareClearScroll().addScrollId(scrollId).execute(
ActionListener.wrap(
clearScrollResponse -> listener.onResponse(clearScrollResponse.isSucceeded()),
listener::onFailure));
} else {
listener.onResponse(false);
}
}

View File

@ -61,6 +61,11 @@ public class CliFormatterCursor implements Cursor {
delegate.nextPage(cfg, client, listener);
}
@Override
public void clear(Configuration cfg, Client client, ActionListener<Boolean> listener) {
delegate.clear(cfg, client, listener);
}
@Override
public String getWriteableName() {
return NAME;

View File

@ -70,6 +70,11 @@ public class JdbcCursor implements Cursor {
delegate.nextPage(cfg, client, listener);
}
@Override
public void clear(Configuration cfg, Client client, ActionListener<Boolean> listener) {
delegate.clear(cfg, client, listener);
}
@Override
public String getWriteableName() {
return NAME;

View File

@ -16,10 +16,12 @@ import org.elasticsearch.rest.RestController;
import org.elasticsearch.xpack.sql.cli.net.protocol.InfoResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto;
import org.elasticsearch.xpack.sql.cli.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryCloseRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageRequest;
import org.elasticsearch.xpack.sql.cli.net.protocol.QueryPageResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryCloseResponse;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlAction;
import org.elasticsearch.xpack.sql.plugin.sql.action.SqlRequest;
import org.elasticsearch.xpack.sql.protocol.shared.Request;
@ -64,6 +66,8 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction {
return queryInit(client, (QueryInitRequest) request);
case QUERY_PAGE:
return queryPage(client, (QueryPageRequest) request);
case QUERY_CLOSE:
return queryClose(client, (QueryCloseRequest) request);
default:
throw new IllegalArgumentException("Unsupported action [" + requestType + "]");
}
@ -102,4 +106,11 @@ public class RestSqlCliAction extends AbstractSqlProtocolRestAction {
Cursor.encodeToString(Version.CURRENT, CliFormatterCursor.wrap(response.cursor(), formatter)), data);
}));
}
private Consumer<RestChannel> queryClose(Client client, QueryCloseRequest request) {
Cursor cursor = Cursor.decodeFromString(request.cursor);
SqlClearCursorAction.Request sqlRequest = new SqlClearCursorAction.Request(cursor);
return channel -> client.execute(SqlClearCursorAction.INSTANCE, sqlRequest, toActionListener(channel,
response -> new QueryCloseResponse(response.isSucceeded())));
}
}

View File

@ -25,6 +25,8 @@ import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.MetaTableResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.Proto.RequestType;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryCloseRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryCloseResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitRequest;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryInitResponse;
import org.elasticsearch.xpack.sql.jdbc.net.protocol.QueryPageRequest;
@ -95,6 +97,8 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
return queryInit(client, (QueryInitRequest) request);
case QUERY_PAGE:
return queryPage(client, (QueryPageRequest) request);
case QUERY_CLOSE:
return queryClose(client, (QueryCloseRequest) request);
default:
throw new IllegalArgumentException("Unsupported action [" + requestType + "]");
}
@ -165,4 +169,11 @@ public class RestSqlJdbcAction extends AbstractSqlProtocolRestAction {
Cursor.encodeToString(Version.CURRENT, JdbcCursor.wrap(response.cursor(), types)),
new SqlResponsePayload(types, response.rows()))));
}
private Consumer<RestChannel> queryClose(Client client, QueryCloseRequest request) {
Cursor cursor = Cursor.decodeFromString(request.cursor);
SqlClearCursorAction.Request sqlRequest = new SqlClearCursorAction.Request(cursor);
return channel -> client.execute(SqlClearCursorAction.INSTANCE, sqlRequest, toActionListener(channel,
response -> new QueryCloseResponse(response.isSucceeded())));
}
}

View File

@ -0,0 +1,261 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.action.Action;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.ActionRequest;
import org.elasticsearch.action.ActionRequestBuilder;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.ActionResponse;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.client.node.NodeClient;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.ObjectParser;
import org.elasticsearch.common.xcontent.StatusToXContentObject;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentParser;
import org.elasticsearch.rest.BaseRestHandler;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.rest.action.RestToXContentListener;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.TransportService;
import org.elasticsearch.xpack.sql.execution.PlanExecutor;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor;
import java.io.IOException;
import java.util.Objects;
import static org.elasticsearch.action.ValidateActions.addValidationError;
import static org.elasticsearch.rest.RestRequest.Method.POST;
import static org.elasticsearch.rest.RestStatus.NOT_FOUND;
import static org.elasticsearch.rest.RestStatus.OK;
public class SqlClearCursorAction
extends Action<SqlClearCursorAction.Request, SqlClearCursorAction.Response, SqlClearCursorAction.RequestBuilder> {
public static final SqlClearCursorAction INSTANCE = new SqlClearCursorAction();
public static final String NAME = "indices:data/read/sql/close_cursor";
private SqlClearCursorAction() {
super(NAME);
}
@Override
public RequestBuilder newRequestBuilder(ElasticsearchClient client) {
return new RequestBuilder(client, this);
}
@Override
public Response newResponse() {
return new Response();
}
public static class Request extends ActionRequest {
public static final ObjectParser<Request, Void> PARSER = new ObjectParser<>(NAME, Request::new);
public static final ParseField CURSOR = new ParseField("cursor");
static {
PARSER.declareString((request, nextPage) -> request.setCursor(Cursor.decodeFromString(nextPage)), CURSOR);
}
private Cursor cursor;
public Request() {
}
public Request(Cursor cursor) {
this.cursor = cursor;
}
@Override
public ActionRequestValidationException validate() {
ActionRequestValidationException validationException = null;
if (getCursor() == null) {
validationException = addValidationError("cursor is required", validationException);
}
return validationException;
}
public Cursor getCursor() {
return cursor;
}
public Request setCursor(Cursor cursor) {
this.cursor = cursor;
return this;
}
@Override
public String getDescription() {
return "SQL Clean cursor [" + getCursor() + "]";
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
cursor = in.readNamedWriteable(Cursor.class);
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeNamedWriteable(cursor);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Request request = (Request) o;
return Objects.equals(cursor, request.cursor);
}
@Override
public int hashCode() {
return Objects.hash(cursor);
}
}
public static class RequestBuilder extends ActionRequestBuilder<Request, Response, RequestBuilder> {
public RequestBuilder(ElasticsearchClient client, SqlClearCursorAction action, Cursor cursor) {
super(client, action, new Request(cursor));
}
public RequestBuilder(ElasticsearchClient client, SqlClearCursorAction action) {
super(client, action, new Request());
}
public RequestBuilder cursor(Cursor cursor) {
request.setCursor(cursor);
return this;
}
}
public static class Response extends ActionResponse implements StatusToXContentObject {
private static final ParseField SUCCEEDED = new ParseField("succeeded");
private boolean succeeded;
public Response(boolean succeeded) {
this.succeeded = succeeded;
}
Response() {
}
/**
* @return Whether the attempt to clear a cursor was successful.
*/
public boolean isSucceeded() {
return succeeded;
}
public Response setSucceeded(boolean succeeded) {
this.succeeded = succeeded;
return this;
}
@Override
public RestStatus status() {
return succeeded ? NOT_FOUND : OK;
}
@Override
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
builder.startObject();
builder.field(SUCCEEDED.getPreferredName(), succeeded);
builder.endObject();
return builder;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);
succeeded = in.readBoolean();
}
@Override
public void writeTo(StreamOutput out) throws IOException {
super.writeTo(out);
out.writeBoolean(succeeded);
}
@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
Response response = (Response) o;
return succeeded == response.succeeded;
}
@Override
public int hashCode() {
return Objects.hash(succeeded);
}
}
public static class TransportAction extends HandledTransportAction<Request, Response> {
private final PlanExecutor planExecutor;
private final SqlLicenseChecker sqlLicenseChecker;
@Inject
public TransportAction(Settings settings, ThreadPool threadPool,
TransportService transportService, ActionFilters actionFilters,
IndexNameExpressionResolver indexNameExpressionResolver,
PlanExecutor planExecutor,
SqlLicenseChecker sqlLicenseChecker) {
super(settings, NAME, threadPool, transportService, actionFilters,
indexNameExpressionResolver, Request::new);
this.planExecutor = planExecutor;
this.sqlLicenseChecker = sqlLicenseChecker;
}
@Override
protected void doExecute(Request request, ActionListener<Response> listener) {
sqlLicenseChecker.checkIfSqlAllowed();
Cursor cursor = request.getCursor();
planExecutor.cleanCursor(Configuration.DEFAULT, cursor, ActionListener.wrap(
success -> listener.onResponse(new Response(success)), listener::onFailure));
}
}
public static class RestAction extends BaseRestHandler {
public RestAction(Settings settings, RestController controller) {
super(settings);
controller.registerHandler(POST, "/_xpack/sql/close", this);
}
@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
Request sqlRequest;
try (XContentParser parser = request.contentOrSourceParamParser()) {
sqlRequest = Request.PARSER.apply(parser, null);
}
return channel -> client.executeLocally(SqlClearCursorAction.INSTANCE, sqlRequest, new RestToXContentListener<>(channel));
}
@Override
public String getName() {
return "sql_translate_action";
}
}
}

View File

@ -70,7 +70,8 @@ public class SqlPlugin implements ActionPlugin {
return Arrays.asList(new RestSqlAction(settings, restController),
new SqlTranslateAction.RestAction(settings, restController),
new RestSqlCliAction(settings, restController),
new RestSqlJdbcAction(settings, restController, sqlLicenseChecker, indexResolver));
new RestSqlJdbcAction(settings, restController, sqlLicenseChecker, indexResolver),
new SqlClearCursorAction.RestAction(settings, restController));
}
@Override
@ -80,6 +81,7 @@ public class SqlPlugin implements ActionPlugin {
}
return Arrays.asList(new ActionHandler<>(SqlAction.INSTANCE, TransportSqlAction.class),
new ActionHandler<>(SqlTranslateAction.INSTANCE, SqlTranslateAction.TransportAction.class));
new ActionHandler<>(SqlTranslateAction.INSTANCE, SqlTranslateAction.TransportAction.class),
new ActionHandler<>(SqlClearCursorAction.INSTANCE, SqlClearCursorAction.TransportAction.class));
}
}

View File

@ -74,7 +74,6 @@ public class SqlRequest extends AbstractSqlRequest {
return this;
}
@Override
public void readFrom(StreamInput in) throws IOException {
super.readFrom(in);

View File

@ -33,13 +33,13 @@ public class SqlRequestBuilder extends ActionRequestBuilder<SqlRequest, SqlRespo
return this;
}
public SqlRequestBuilder filter(QueryBuilder filter) {
request.filter(filter);
public SqlRequestBuilder cursor(Cursor cursor) {
request.cursor(cursor);
return this;
}
public SqlRequestBuilder nextPageKey(Cursor nextPageInfo) {
request.cursor(nextPageInfo);
public SqlRequestBuilder filter(QueryBuilder filter) {
request.filter(filter);
return this;
}
@ -57,4 +57,9 @@ public class SqlRequestBuilder extends ActionRequestBuilder<SqlRequest, SqlRespo
request.pageTimeout(timeout);
return this;
}
public SqlRequestBuilder fetchSize(int fetchSize) {
request.fetchSize(fetchSize);
return this;
}
}

View File

@ -42,6 +42,11 @@ public interface Cursor extends NamedWriteable {
*/
void nextPage(Configuration cfg, Client client, ActionListener<RowSet> listener);
/**
* Cleans the resources associated with the cursor
*/
void clear(Configuration cfg, Client client, ActionListener<Boolean> listener);
/**
* The {@link NamedWriteable}s required to deserialize {@link Cursor}s.
*/

View File

@ -34,6 +34,12 @@ class EmptyCursor implements Cursor {
throw new IllegalArgumentException("there is no next page");
}
@Override
public void clear(Configuration cfg, Client client, ActionListener<Boolean> listener) {
// There is nothing to clean
listener.onResponse(false);
}
@Override
public boolean equals(Object obj) {
return obj == this;

View File

@ -0,0 +1,52 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.execution.search;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.support.PlainActionFuture;
import org.elasticsearch.client.Client;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.xpack.sql.session.Configuration;
import org.elasticsearch.xpack.sql.session.Cursor;
import org.mockito.ArgumentCaptor;
import java.util.Collections;
import static org.elasticsearch.action.support.PlainActionFuture.newFuture;
import static org.hamcrest.Matchers.hasSize;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
public class CursorTests extends ESTestCase {
public void testEmptyCursorClearCursor() {
Client clientMock = mock(Client.class);
Cursor cursor = Cursor.EMPTY;
PlainActionFuture<Boolean> future = newFuture();
cursor.clear(Configuration.DEFAULT, clientMock, future);
assertFalse(future.actionGet());
verifyZeroInteractions(clientMock);
}
@SuppressWarnings("unchecked")
public void testScrollCursorClearCursor() {
Client clientMock = mock(Client.class);
ActionListener<Boolean> listenerMock = mock(ActionListener.class);
String cursorString = randomAlphaOfLength(10);
Cursor cursor = new ScrollCursor(cursorString, Collections.emptyList(), randomInt());
cursor.clear(Configuration.DEFAULT, clientMock, listenerMock);
ArgumentCaptor<ClearScrollRequest> request = ArgumentCaptor.forClass(ClearScrollRequest.class);
verify(clientMock).clearScroll(request.capture(), any(ActionListener.class));
assertEquals(Collections.singletonList(cursorString), request.getValue().getScrollIds());
verifyZeroInteractions(listenerMock);
}
}

View File

@ -0,0 +1,38 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils.MutateFunction;
import org.elasticsearch.xpack.sql.session.Cursor;
import static org.elasticsearch.xpack.sql.execution.search.ScrollCursorTests.randomScrollCursor;
public class SqlClearCursorRequestTests extends AbstractStreamableTestCase<SqlClearCursorAction.Request> {
@Override
protected SqlClearCursorAction.Request createTestInstance() {
return new SqlClearCursorAction.Request(randomScrollCursor());
}
@Override
protected SqlClearCursorAction.Request createBlankInstance() {
return new SqlClearCursorAction.Request();
}
@Override
@SuppressWarnings("unchecked")
protected MutateFunction<SqlClearCursorAction.Request> getMutateFunction() {
return request -> getCopyFunction().copy(request).setCursor(randomScrollCursor());
}
@Override
protected NamedWriteableRegistry getNamedWriteableRegistry() {
return new NamedWriteableRegistry(Cursor.getNamedWriteables());
}
}

View File

@ -0,0 +1,27 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.plugin;
import org.elasticsearch.test.AbstractStreamableTestCase;
import org.elasticsearch.test.EqualsHashCodeTestUtils.MutateFunction;
public class SqlClearCursorResponseTests extends AbstractStreamableTestCase<SqlClearCursorAction.Response> {
@Override
protected SqlClearCursorAction.Response createTestInstance() {
return new SqlClearCursorAction.Response(randomBoolean());
}
@Override
protected SqlClearCursorAction.Response createBlankInstance() {
return new SqlClearCursorAction.Response();
}
@Override
protected MutateFunction<SqlClearCursorAction.Response> getMutateFunction() {
return response -> getCopyFunction().copy(response).setSucceeded(response.isSucceeded() == false);
}
}

View File

@ -29,7 +29,6 @@ import java.sql.SQLInvalidAuthorizationSpecException;
import java.sql.SQLRecoverableException;
import java.sql.SQLSyntaxErrorException;
import java.sql.SQLTimeoutException;
import java.util.Arrays;
import java.util.Base64;
import java.util.function.Function;
import java.util.zip.GZIPInputStream;

View File

@ -0,0 +1,49 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.protocol.shared;
import java.io.IOException;
import java.util.Objects;
public abstract class AbstractQueryCloseRequest extends Request {
public final String cursor;
protected AbstractQueryCloseRequest(String cursor) {
if (cursor == null) {
throw new IllegalArgumentException("[cursor] must not be null");
}
this.cursor = cursor;
}
protected AbstractQueryCloseRequest(SqlDataInput in) throws IOException {
this.cursor = in.readUTF();
}
@Override
public void writeTo(SqlDataOutput out) throws IOException {
out.writeUTF(cursor);
}
@Override
protected String toStringBody() {
return cursor;
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
AbstractQueryCloseRequest other = (AbstractQueryCloseRequest) obj;
return Objects.equals(cursor, other.cursor);
}
@Override
public int hashCode() {
return Objects.hash(cursor);
}
}

View File

@ -0,0 +1,57 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.xpack.sql.protocol.shared;
import java.io.DataInput;
import java.io.IOException;
import java.util.Objects;
/**
* Superclass for responses both for {@link AbstractQueryInitRequest}
* and {@link AbstractQueryPageRequest}.
*/
public abstract class AbstractQueryCloseResponse extends Response {
private final boolean succeeded;
protected AbstractQueryCloseResponse(boolean succeeded) {
this.succeeded = succeeded;
}
protected AbstractQueryCloseResponse(Request request, DataInput in) throws IOException {
succeeded = in.readBoolean();
}
@Override
protected void writeTo(SqlDataOutput out) throws IOException {
out.writeBoolean(succeeded);
}
/**
* True if the cursor was really closed
*/
public boolean succeeded() {
return succeeded;
}
@Override
protected String toStringBody() {
return Boolean.toString(succeeded);
}
@Override
public boolean equals(Object obj) {
if (obj == null || obj.getClass() != getClass()) {
return false;
}
AbstractQueryCloseResponse other = (AbstractQueryCloseResponse) obj;
return succeeded == other.succeeded;
}
@Override
public int hashCode() {
return Objects.hash(succeeded);
}
}