Fix reindex from remote clearing scroll (#22525)

Reindex-from-remote had a race when it tried to clear the scroll. It
first starts the request to clear the scroll and then submits a task
to the generic threadpool to shutdown the client. These two things
race and, in my experience, closing the scroll generally loses. That
means that most of the time reindex-from-remote isn't clearing the
scrolls that it uses. This isn't the end of the world because we
flush old scroll contexts after a while but this isn't great.

Noticed while experimenting with #22514.
This commit is contained in:
Nik Everett 2017-01-10 10:30:23 -05:00 committed by GitHub
parent 75d5b3d9eb
commit 78bb56671e
5 changed files with 184 additions and 228 deletions

View File

@ -88,11 +88,7 @@ public class ClientScrollableHitSource extends ScrollableHitSource {
} }
@Override @Override
public void clearScroll(String scrollId) { public void clearScroll(String scrollId, Runnable onCompletion) {
/*
* Fire off the clear scroll but don't wait for it it return before
* we send the use their response.
*/
ClearScrollRequest clearScrollRequest = new ClearScrollRequest(); ClearScrollRequest clearScrollRequest = new ClearScrollRequest();
clearScrollRequest.addScrollId(scrollId); clearScrollRequest.addScrollId(scrollId);
/* /*
@ -103,15 +99,22 @@ public class ClientScrollableHitSource extends ScrollableHitSource {
@Override @Override
public void onResponse(ClearScrollResponse response) { public void onResponse(ClearScrollResponse response) {
logger.debug("Freed [{}] contexts", response.getNumFreed()); logger.debug("Freed [{}] contexts", response.getNumFreed());
onCompletion.run();
} }
@Override @Override
public void onFailure(Exception e) { public void onFailure(Exception e) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("Failed to clear scroll [{}]", scrollId), e); logger.warn((Supplier<?>) () -> new ParameterizedMessage("Failed to clear scroll [{}]", scrollId), e);
onCompletion.run();
} }
}); });
} }
@Override
protected void cleanup() {
// Nothing to do
}
/** /**
* Run a search action and call onResponse when a the response comes in, retrying if the action fails with an exception caused by * Run a search action and call onResponse when a the response comes in, retrying if the action fails with an exception caused by
* rejected execution. * rejected execution.

View File

@ -83,13 +83,24 @@ public abstract class ScrollableHitSource implements Closeable {
protected abstract void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer<? super Response> onResponse); protected abstract void doStartNextScroll(String scrollId, TimeValue extraKeepAlive, Consumer<? super Response> onResponse);
@Override @Override
public void close() { public final void close() {
String scrollId = this.scrollId.get(); String scrollId = this.scrollId.get();
if (Strings.hasLength(scrollId)) { if (Strings.hasLength(scrollId)) {
clearScroll(scrollId); clearScroll(scrollId, this::cleanup);
} else {
cleanup();
} }
} }
protected abstract void clearScroll(String scrollId); /**
* Called to clear a scroll id.
* @param scrollId the id to clear
* @param onCompletion implementers must call this after completing the clear whether they are successful or not
*/
protected abstract void clearScroll(String scrollId, Runnable onCompletion);
/**
* Called after the process has been totally finished to clean up any resources the process needed like remote connections.
*/
protected abstract void cleanup();
/** /**
* Set the id of the last scroll. Used for debugging. * Set the id of the last scroll. Used for debugging.

View File

@ -81,21 +81,6 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
this.client = client; this.client = client;
} }
@Override
public void close() {
super.close();
/* This might be called on the RestClient's thread pool and attempting to close the client on its own threadpool causes it to fail
* to close. So we always shutdown the RestClient asynchronously on a thread in Elasticsearch's generic thread pool. That way we
* never close the client in its own thread pool. */
threadPool.generic().submit(() -> {
try {
client.close();
} catch (IOException e) {
logger.error("Failed to shutdown the remote connection", e);
}
});
}
@Override @Override
protected void doStart(Consumer<? super Response> onResponse) { protected void doStart(Consumer<? super Response> onResponse) {
lookupRemoteVersion(version -> { lookupRemoteVersion(version -> {
@ -125,17 +110,32 @@ public class RemoteScrollableHitSource extends ScrollableHitSource {
} }
@Override @Override
protected void clearScroll(String scrollId) { protected void clearScroll(String scrollId, Runnable onCompletion) {
// Need to throw out response....
client.performRequestAsync("DELETE", scrollPath(), emptyMap(), scrollEntity(scrollId), new ResponseListener() { client.performRequestAsync("DELETE", scrollPath(), emptyMap(), scrollEntity(scrollId), new ResponseListener() {
@Override @Override
public void onSuccess(org.elasticsearch.client.Response response) { public void onSuccess(org.elasticsearch.client.Response response) {
logger.debug("Successfully cleared [{}]", scrollId); logger.debug("Successfully cleared [{}]", scrollId);
onCompletion.run();
} }
@Override @Override
public void onFailure(Exception t) { public void onFailure(Exception t) {
logger.warn((Supplier<?>) () -> new ParameterizedMessage("Failed to clear scroll [{}]", scrollId), t); logger.warn((Supplier<?>) () -> new ParameterizedMessage("Failed to clear scroll [{}]", scrollId), t);
onCompletion.run();
}
});
}
@Override
protected void cleanup() {
/* This is called on the RestClient's thread pool and attempting to close the client on its own threadpool causes it to fail to
* close. So we always shutdown the RestClient asynchronously on a thread in Elasticsearch's generic thread pool. */
threadPool.generic().submit(() -> {
try {
client.close();
logger.info("Shut down remote connection");
} catch (IOException e) {
logger.error("Failed to shutdown the remote connection", e);
} }
}); });
} }

View File

@ -26,6 +26,13 @@
- is_false: task - is_false: task
- is_false: deleted - is_false: deleted
# Make sure reindex closed all the scroll contexts
- do:
indices.stats:
index: source
metric: search
- match: {indices.source.total.search.open_contexts: 0}
--- ---
"Response format for updated": "Response format for updated":
- do: - do:
@ -60,6 +67,13 @@
- is_false: task - is_false: task
- is_false: deleted - is_false: deleted
# Make sure reindex closed all the scroll contexts
- do:
indices.stats:
index: source
metric: search
- match: {indices.source.total.search.open_contexts: 0}
--- ---
"wait_for_completion=false": "wait_for_completion=false":
- do: - do:
@ -110,6 +124,13 @@
- is_false: response.task - is_false: response.task
- is_false: response.deleted - is_false: response.deleted
# Make sure reindex closed all the scroll contexts
- do:
indices.stats:
index: source
metric: search
- match: {indices.source.total.search.open_contexts: 0}
--- ---
"Response format for version conflict": "Response format for version conflict":
- do: - do:
@ -151,6 +172,13 @@
- match: {failures.0.cause.index: dest} - match: {failures.0.cause.index: dest}
- gte: { took: 0 } - gte: { took: 0 }
# Make sure reindex closed all the scroll contexts
- do:
indices.stats:
index: source
metric: search
- match: {indices.source.total.search.open_contexts: 0}
--- ---
"Response format for version conflict with conflicts=proceed": "Response format for version conflict with conflicts=proceed":
- do: - do:
@ -185,210 +213,12 @@
- match: {throttled_millis: 0} - match: {throttled_millis: 0}
- gte: { took: 0 } - gte: { took: 0 }
--- # Make sure reindex closed all the scroll contexts
"Simplest example in docs":
- do: - do:
index: indices.stats:
index: twitter index: source
type: tweet metric: search
id: 1 - match: {indices.source.total.search.open_contexts: 0}
body: { "user": "kimchy" }
- do:
indices.refresh: {}
- do:
reindex:
refresh: true
body:
source:
index: twitter
dest:
index: new_twitter
- do:
search:
index: new_twitter
- match: { hits.total: 1 }
---
"Limit by type example in docs":
- do:
index:
index: twitter
type: tweet
id: 1
body: { "user": "kimchy" }
- do:
index:
index: twitter
type: junk
id: 1
body: { "user": "kimchy" }
- do:
indices.refresh: {}
- do:
reindex:
refresh: true
body:
source:
index: twitter
type: tweet
dest:
index: new_twitter
- do:
search:
index: new_twitter
- match: { hits.total: 1 }
---
"Limit by query example in docs":
- do:
index:
index: twitter
type: tweet
id: 1
body: { "user": "kimchy" }
- do:
index:
index: twitter
type: tweet
id: 2
body: { "user": "junk" }
- do:
indices.refresh: {}
- do:
reindex:
refresh: true
body:
source:
index: twitter
query:
match:
user: kimchy
dest:
index: new_twitter
- do:
search:
index: new_twitter
- match: { hits.total: 1 }
---
"Override type example in docs":
- do:
index:
index: twitter
type: tweet
id: 1
body: { "user": "kimchy" }
- do:
index:
index: twitter
type: junk
id: 1
body: { "user": "kimchy" }
- do:
indices.refresh: {}
- do:
reindex:
refresh: true
body:
source:
index: twitter
type: tweet
dest:
index: new_twitter
type: chirp
- do:
search:
index: new_twitter
type: chirp
- match: { hits.total: 1 }
---
"Multi index, multi type example from docs":
- do:
index:
index: twitter
type: tweet
id: 1
body: { "user": "kimchy" }
- do:
index:
index: blog
type: post
id: 1
body: { "user": "kimchy" }
- do:
indices.refresh: {}
- do:
reindex:
refresh: true
body:
source:
index: [twitter, blog]
type: [tweet, post]
dest:
index: all_together
- do:
search:
index: all_together
type: tweet
body:
query:
match:
user: kimchy
- match: { hits.total: 1 }
- do:
search:
index: all_together
type: post
body:
query:
match:
user: kimchy
- match: { hits.total: 1 }
---
"Limit by size example from docs":
- do:
index:
index: twitter
type: tweet
id: 1
body: { "user": "kimchy" }
- do:
index:
index: twitter
type: tweet
id: 2
body: { "user": "kimchy" }
- do:
indices.refresh: {}
- do:
reindex:
refresh: true
body:
size: 1
source:
index: twitter
dest:
index: new_twitter
- do:
search:
index: new_twitter
type: tweet
- match: { hits.total: 1 }
--- ---
"Source document without any fields works": "Source document without any fields works":
@ -417,6 +247,13 @@
id: 1 id: 1
- match: { _source: {} } - match: { _source: {} }
# Make sure reindex closed all the scroll contexts
- do:
indices.stats:
index: source
metric: search
- match: {indices.source.total.search.open_contexts: 0}
--- ---
"Reindex with source filtering": "Reindex with source filtering":
- do: - do:
@ -455,3 +292,10 @@
id: 1 id: 1
- match: { _source.text: "test" } - match: { _source.text: "test" }
- is_false: _source.filtered - is_false: _source.filtered
# Make sure reindex closed all the scroll contexts
- do:
indices.stats:
index: source
metric: search
- match: {indices.source.total.search.open_contexts: 0}

View File

@ -46,6 +46,13 @@
text: test text: test
- match: {hits.total: 1} - match: {hits.total: 1}
# Make sure reindex closed all the scroll contexts
- do:
indices.stats:
index: source
metric: search
- match: {indices.source.total.search.open_contexts: 0}
--- ---
"Reindex from remote with query": "Reindex from remote with query":
- do: - do:
@ -95,6 +102,13 @@
match_all: {} match_all: {}
- match: {hits.total: 1} - match: {hits.total: 1}
# Make sure reindex closed all the scroll contexts
- do:
indices.stats:
index: source
metric: search
- match: {indices.source.total.search.open_contexts: 0}
--- ---
"Reindex from remote with routing": "Reindex from remote with routing":
- do: - do:
@ -137,6 +151,13 @@
text: test text: test
- match: {hits.total: 1} - match: {hits.total: 1}
# Make sure reindex closed all the scroll contexts
- do:
indices.stats:
index: source
metric: search
- match: {indices.source.total.search.open_contexts: 0}
--- ---
"Reindex from remote with parent/child": "Reindex from remote with parent/child":
- do: - do:
@ -206,6 +227,13 @@
text: test text: test
- match: {hits.total: 1} - match: {hits.total: 1}
# Make sure reindex closed all the scroll contexts
- do:
indices.stats:
index: source
metric: search
- match: {indices.source.total.search.open_contexts: 0}
--- ---
"Reindex from remote with timeouts": "Reindex from remote with timeouts":
# Validates that you can configure the socket_timeout and connect_timeout, # Validates that you can configure the socket_timeout and connect_timeout,
@ -258,6 +286,76 @@
text: test text: test
- match: {hits.total: 1} - match: {hits.total: 1}
# Make sure reindex closed all the scroll contexts
- do:
indices.stats:
index: source
metric: search
- match: {indices.source.total.search.open_contexts: 0}
---
"Reindex from remote with size":
- do:
index:
index: source
type: foo
id: 1
body: { "text": "test" }
refresh: true
- do:
index:
index: source
type: foo
id: 2
body: { "text": "test" }
refresh: true
# Fetch the http host. We use the host of the master because we know there will always be a master.
- do:
cluster.state: {}
- set: { master_node: master }
- do:
nodes.info:
metric: [ http ]
- is_true: nodes.$master.http.publish_address
- set: {nodes.$master.http.publish_address: host}
- do:
reindex:
refresh: true
body:
size: 1
source:
remote:
host: http://${host}
index: source
dest:
index: dest
- match: {created: 1}
- match: {updated: 0}
- match: {version_conflicts: 0}
- match: {batches: 1}
- match: {failures: []}
- match: {throttled_millis: 0}
- gte: { took: 0 }
- is_false: task
- is_false: deleted
- do:
search:
index: dest
body:
query:
match:
text: test
- match: {hits.total: 1}
# Make sure reindex closed all the scroll contexts
- do:
indices.stats:
index: source
metric: search
- match: {indices.source.total.search.open_contexts: 0}
--- ---
"Reindex from remote with broken query": "Reindex from remote with broken query":
- do: - do: