diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java index 7b6be85140f..2e2b375aa14 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java @@ -88,11 +88,7 @@ public class ClientScrollableHitSource extends ScrollableHitSource { } @Override - public void clearScroll(String scrollId) { - /* - * Fire off the clear scroll but don't wait for it it return before - * we send the use their response. - */ + public void clearScroll(String scrollId, Runnable onCompletion) { ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); clearScrollRequest.addScrollId(scrollId); /* @@ -103,15 +99,22 @@ public class ClientScrollableHitSource extends ScrollableHitSource { @Override public void onResponse(ClearScrollResponse response) { logger.debug("Freed [{}] contexts", response.getNumFreed()); + onCompletion.run(); } @Override public void onFailure(Exception e) { logger.warn((Supplier) () -> new ParameterizedMessage("Failed to clear scroll [{}]", scrollId), e); + onCompletion.run(); } }); } + @Override + protected void cleanup() { + // Nothing to do + } + /** * Run a search action and call onResponse when a the response comes in, retrying if the action fails with an exception caused by * rejected execution. diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java index bf13d6d72e2..1945c6e2f76 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/ScrollableHitSource.java @@ -83,13 +83,24 @@ public abstract class ScrollableHitSource implements Closeable { protected abstract void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer onResponse); @Override - public void close() { + public final void close() { String scrollId = this.scrollId.get(); if (Strings.hasLength(scrollId)) { - clearScroll(scrollId); + clearScroll(scrollId, this::cleanup); + } else { + cleanup(); } } - protected abstract void clearScroll(String scrollId); + /** + * Called to clear a scroll id. + * @param scrollId the id to clear + * @param onCompletion implementers must call this after completing the clear whether they are successful or not + */ + protected abstract void clearScroll(String scrollId, Runnable onCompletion); + /** + * Called after the process has been totally finished to clean up any resources the process needed like remote connections. + */ + protected abstract void cleanup(); /** * Set the id of the last scroll. Used for debugging. diff --git a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java index cb8f543756b..03a90bba815 100644 --- a/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java +++ b/modules/reindex/src/main/java/org/elasticsearch/index/reindex/remote/RemoteScrollableHitSource.java @@ -81,21 +81,6 @@ public class RemoteScrollableHitSource extends ScrollableHitSource { this.client = client; } - @Override - public void close() { - super.close(); - /* This might be called on the RestClient's thread pool and attempting to close the client on its own threadpool causes it to fail - * to close. So we always shutdown the RestClient asynchronously on a thread in Elasticsearch's generic thread pool. That way we - * never close the client in its own thread pool. */ - threadPool.generic().submit(() -> { - try { - client.close(); - } catch (IOException e) { - logger.error("Failed to shutdown the remote connection", e); - } - }); - } - @Override protected void doStart(Consumer onResponse) { lookupRemoteVersion(version -> { @@ -125,17 +110,32 @@ public class RemoteScrollableHitSource extends ScrollableHitSource { } @Override - protected void clearScroll(String scrollId) { - // Need to throw out response.... + protected void clearScroll(String scrollId, Runnable onCompletion) { client.performRequestAsync("DELETE", scrollPath(), emptyMap(), scrollEntity(scrollId), new ResponseListener() { @Override public void onSuccess(org.elasticsearch.client.Response response) { logger.debug("Successfully cleared [{}]", scrollId); + onCompletion.run(); } @Override public void onFailure(Exception t) { logger.warn((Supplier) () -> new ParameterizedMessage("Failed to clear scroll [{}]", scrollId), t); + onCompletion.run(); + } + }); + } + + @Override + protected void cleanup() { + /* This is called on the RestClient's thread pool and attempting to close the client on its own threadpool causes it to fail to + * close. So we always shutdown the RestClient asynchronously on a thread in Elasticsearch's generic thread pool. */ + threadPool.generic().submit(() -> { + try { + client.close(); + logger.info("Shut down remote connection"); + } catch (IOException e) { + logger.error("Failed to shutdown the remote connection", e); } }); } diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml index 403382622f1..3557cf9bad7 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/10_basic.yaml @@ -26,6 +26,13 @@ - is_false: task - is_false: deleted + # Make sure reindex closed all the scroll contexts + - do: + indices.stats: + index: source + metric: search + - match: {indices.source.total.search.open_contexts: 0} + --- "Response format for updated": - do: @@ -60,6 +67,13 @@ - is_false: task - is_false: deleted + # Make sure reindex closed all the scroll contexts + - do: + indices.stats: + index: source + metric: search + - match: {indices.source.total.search.open_contexts: 0} + --- "wait_for_completion=false": - do: @@ -110,6 +124,13 @@ - is_false: response.task - is_false: response.deleted + # Make sure reindex closed all the scroll contexts + - do: + indices.stats: + index: source + metric: search + - match: {indices.source.total.search.open_contexts: 0} + --- "Response format for version conflict": - do: @@ -151,6 +172,13 @@ - match: {failures.0.cause.index: dest} - gte: { took: 0 } + # Make sure reindex closed all the scroll contexts + - do: + indices.stats: + index: source + metric: search + - match: {indices.source.total.search.open_contexts: 0} + --- "Response format for version conflict with conflicts=proceed": - do: @@ -185,210 +213,12 @@ - match: {throttled_millis: 0} - gte: { took: 0 } ---- -"Simplest example in docs": + # Make sure reindex closed all the scroll contexts - do: - index: - index: twitter - type: tweet - id: 1 - body: { "user": "kimchy" } - - do: - indices.refresh: {} - - - do: - reindex: - refresh: true - body: - source: - index: twitter - dest: - index: new_twitter - - - do: - search: - index: new_twitter - - match: { hits.total: 1 } - ---- -"Limit by type example in docs": - - do: - index: - index: twitter - type: tweet - id: 1 - body: { "user": "kimchy" } - - do: - index: - index: twitter - type: junk - id: 1 - body: { "user": "kimchy" } - - do: - indices.refresh: {} - - - do: - reindex: - refresh: true - body: - source: - index: twitter - type: tweet - dest: - index: new_twitter - - - do: - search: - index: new_twitter - - match: { hits.total: 1 } - ---- -"Limit by query example in docs": - - do: - index: - index: twitter - type: tweet - id: 1 - body: { "user": "kimchy" } - - do: - index: - index: twitter - type: tweet - id: 2 - body: { "user": "junk" } - - do: - indices.refresh: {} - - - do: - reindex: - refresh: true - body: - source: - index: twitter - query: - match: - user: kimchy - dest: - index: new_twitter - - - do: - search: - index: new_twitter - - match: { hits.total: 1 } - ---- -"Override type example in docs": - - do: - index: - index: twitter - type: tweet - id: 1 - body: { "user": "kimchy" } - - do: - index: - index: twitter - type: junk - id: 1 - body: { "user": "kimchy" } - - do: - indices.refresh: {} - - - do: - reindex: - refresh: true - body: - source: - index: twitter - type: tweet - dest: - index: new_twitter - type: chirp - - - do: - search: - index: new_twitter - type: chirp - - match: { hits.total: 1 } - ---- -"Multi index, multi type example from docs": - - do: - index: - index: twitter - type: tweet - id: 1 - body: { "user": "kimchy" } - - do: - index: - index: blog - type: post - id: 1 - body: { "user": "kimchy" } - - do: - indices.refresh: {} - - - do: - reindex: - refresh: true - body: - source: - index: [twitter, blog] - type: [tweet, post] - dest: - index: all_together - - - do: - search: - index: all_together - type: tweet - body: - query: - match: - user: kimchy - - match: { hits.total: 1 } - - - do: - search: - index: all_together - type: post - body: - query: - match: - user: kimchy - - match: { hits.total: 1 } - ---- -"Limit by size example from docs": - - do: - index: - index: twitter - type: tweet - id: 1 - body: { "user": "kimchy" } - - do: - index: - index: twitter - type: tweet - id: 2 - body: { "user": "kimchy" } - - do: - indices.refresh: {} - - - do: - reindex: - refresh: true - body: - size: 1 - source: - index: twitter - dest: - index: new_twitter - - - do: - search: - index: new_twitter - type: tweet - - match: { hits.total: 1 } + indices.stats: + index: source + metric: search + - match: {indices.source.total.search.open_contexts: 0} --- "Source document without any fields works": @@ -417,6 +247,13 @@ id: 1 - match: { _source: {} } + # Make sure reindex closed all the scroll contexts + - do: + indices.stats: + index: source + metric: search + - match: {indices.source.total.search.open_contexts: 0} + --- "Reindex with source filtering": - do: @@ -455,3 +292,10 @@ id: 1 - match: { _source.text: "test" } - is_false: _source.filtered + + # Make sure reindex closed all the scroll contexts + - do: + indices.stats: + index: source + metric: search + - match: {indices.source.total.search.open_contexts: 0} diff --git a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/90_remote.yaml b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/90_remote.yaml index 7b10a4612e6..576447b4e54 100644 --- a/modules/reindex/src/test/resources/rest-api-spec/test/reindex/90_remote.yaml +++ b/modules/reindex/src/test/resources/rest-api-spec/test/reindex/90_remote.yaml @@ -46,6 +46,13 @@ text: test - match: {hits.total: 1} + # Make sure reindex closed all the scroll contexts + - do: + indices.stats: + index: source + metric: search + - match: {indices.source.total.search.open_contexts: 0} + --- "Reindex from remote with query": - do: @@ -95,6 +102,13 @@ match_all: {} - match: {hits.total: 1} + # Make sure reindex closed all the scroll contexts + - do: + indices.stats: + index: source + metric: search + - match: {indices.source.total.search.open_contexts: 0} + --- "Reindex from remote with routing": - do: @@ -137,6 +151,13 @@ text: test - match: {hits.total: 1} + # Make sure reindex closed all the scroll contexts + - do: + indices.stats: + index: source + metric: search + - match: {indices.source.total.search.open_contexts: 0} + --- "Reindex from remote with parent/child": - do: @@ -206,6 +227,13 @@ text: test - match: {hits.total: 1} + # Make sure reindex closed all the scroll contexts + - do: + indices.stats: + index: source + metric: search + - match: {indices.source.total.search.open_contexts: 0} + --- "Reindex from remote with timeouts": # Validates that you can configure the socket_timeout and connect_timeout, @@ -258,6 +286,76 @@ text: test - match: {hits.total: 1} + # Make sure reindex closed all the scroll contexts + - do: + indices.stats: + index: source + metric: search + - match: {indices.source.total.search.open_contexts: 0} + +--- +"Reindex from remote with size": + - do: + index: + index: source + type: foo + id: 1 + body: { "text": "test" } + refresh: true + - do: + index: + index: source + type: foo + id: 2 + body: { "text": "test" } + refresh: true + + # Fetch the http host. We use the host of the master because we know there will always be a master. + - do: + cluster.state: {} + - set: { master_node: master } + - do: + nodes.info: + metric: [ http ] + - is_true: nodes.$master.http.publish_address + - set: {nodes.$master.http.publish_address: host} + - do: + reindex: + refresh: true + body: + size: 1 + source: + remote: + host: http://${host} + index: source + dest: + index: dest + - match: {created: 1} + - match: {updated: 0} + - match: {version_conflicts: 0} + - match: {batches: 1} + - match: {failures: []} + - match: {throttled_millis: 0} + - gte: { took: 0 } + - is_false: task + - is_false: deleted + + - do: + search: + index: dest + body: + query: + match: + text: test + - match: {hits.total: 1} + + # Make sure reindex closed all the scroll contexts + - do: + indices.stats: + index: source + metric: search + - match: {indices.source.total.search.open_contexts: 0} + --- "Reindex from remote with broken query": - do: