Cleanup reindex after picking up master

Tasks are now constructed in a simpler way.
Max line length of 140 is enforced.
This commit is contained in:
Nik Everett 2016-02-10 14:26:45 -05:00
parent 0da30d5eae
commit f45d4b276e
15 changed files with 36 additions and 19 deletions

View File

@ -52,7 +52,9 @@ import static java.util.Collections.emptyMap;
* Abstract base for scrolling across a search and executing bulk indexes on all * Abstract base for scrolling across a search and executing bulk indexes on all
* results. * results.
*/ */
public abstract class AbstractAsyncBulkIndexByScrollAction<Request extends AbstractBulkIndexByScrollRequest<Request>, Response extends BulkIndexByScrollResponse> public abstract class AbstractAsyncBulkIndexByScrollAction<
Request extends AbstractBulkIndexByScrollRequest<Request>,
Response extends BulkIndexByScrollResponse>
extends AbstractAsyncBulkByScrollAction<Request, Response> { extends AbstractAsyncBulkByScrollAction<Request, Response> {
private final ScriptService scriptService; private final ScriptService scriptService;

View File

@ -254,7 +254,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
@Override @Override
public Task createTask(long id, String type, String action) { public Task createTask(long id, String type, String action) {
return new BulkByScrollTask(id, type, action, this::getDescription); return new BulkByScrollTask(id, type, action, getDescription());
} }
@Override @Override

View File

@ -28,7 +28,10 @@ import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilder;
public abstract class AbstractBulkByScrollRequestBuilder<Request extends AbstractBulkByScrollRequest<Request>, Response extends ActionResponse, Self extends AbstractBulkByScrollRequestBuilder<Request, Response, Self>> public abstract class AbstractBulkByScrollRequestBuilder<
Request extends AbstractBulkByScrollRequest<Request>,
Response extends ActionResponse,
Self extends AbstractBulkByScrollRequestBuilder<Request, Response, Self>>
extends ActionRequestBuilder<Request, Response, Self> { extends ActionRequestBuilder<Request, Response, Self> {
private final SearchRequestBuilder source; private final SearchRequestBuilder source;

View File

@ -25,7 +25,10 @@ import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.client.ElasticsearchClient; import org.elasticsearch.client.ElasticsearchClient;
import org.elasticsearch.script.Script; import org.elasticsearch.script.Script;
public abstract class AbstractBulkIndexByScrollRequestBuilder<Request extends AbstractBulkIndexByScrollRequest<Request>, Response extends ActionResponse, Self extends AbstractBulkIndexByScrollRequestBuilder<Request, Response, Self>> public abstract class AbstractBulkIndexByScrollRequestBuilder<
Request extends AbstractBulkIndexByScrollRequest<Request>,
Response extends ActionResponse,
Self extends AbstractBulkIndexByScrollRequestBuilder<Request, Response, Self>>
extends AbstractBulkByScrollRequestBuilder<Request, Response, Self> { extends AbstractBulkByScrollRequestBuilder<Request, Response, Self> {
protected AbstractBulkIndexByScrollRequestBuilder(ElasticsearchClient client, protected AbstractBulkIndexByScrollRequestBuilder(ElasticsearchClient client,

View File

@ -19,7 +19,6 @@
package org.elasticsearch.plugin.reindex; package org.elasticsearch.plugin.reindex;
import org.elasticsearch.common.inject.Provider;
import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.xcontent.XContentBuilder; import org.elasticsearch.common.xcontent.XContentBuilder;
@ -46,7 +45,7 @@ public class BulkByScrollTask extends Task {
private final AtomicLong versionConflicts = new AtomicLong(0); private final AtomicLong versionConflicts = new AtomicLong(0);
private final AtomicLong retries = new AtomicLong(0); private final AtomicLong retries = new AtomicLong(0);
public BulkByScrollTask(long id, String type, String action, Provider<String> description) { public BulkByScrollTask(long id, String type, String action, String description) {
super(id, type, action, description); super(id, type, action, description);
} }

View File

@ -87,7 +87,8 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
// These exist just so the user can get a nice validation error: // These exist just so the user can get a nice validation error:
destParser.declareString(IndexRequest::timestamp, new ParseField("timestamp")); destParser.declareString(IndexRequest::timestamp, new ParseField("timestamp"));
destParser.declareString((i, ttl) -> i.ttl(parseTimeValue(ttl, TimeValue.timeValueMillis(-1), "ttl").millis()), new ParseField("ttl")); destParser.declareString((i, ttl) -> i.ttl(parseTimeValue(ttl, TimeValue.timeValueMillis(-1), "ttl").millis()),
new ParseField("ttl"));
PARSER.declareField((p, v, c) -> sourceParser.parse(p, v.getSource(), c), new ParseField("source"), ValueType.OBJECT); PARSER.declareField((p, v, c) -> sourceParser.parse(p, v.getSource(), c), new ParseField("source"), ValueType.OBJECT);
PARSER.declareField((p, v, c) -> destParser.parse(p, v.getDestination(), null), new ParseField("dest"), ValueType.OBJECT); PARSER.declareField((p, v, c) -> destParser.parse(p, v.getDestination(), null), new ParseField("dest"), ValueType.OBJECT);

View File

@ -115,8 +115,8 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
* possible. * possible.
*/ */
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<ReindexRequest, ReindexResponse> { static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<ReindexRequest, ReindexResponse> {
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client, ThreadPool threadPool, public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client,
ReindexRequest request, ActionListener<ReindexResponse> listener) { ThreadPool threadPool, ReindexRequest request, ActionListener<ReindexResponse> listener) {
super(task, logger, scriptService, client, threadPool, request, request.getSource(), listener); super(task, logger, scriptService, client, threadPool, request, request.getSource(), listener);
} }

View File

@ -76,8 +76,8 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
* Simple implementation of update-by-query using scrolling and bulk. * Simple implementation of update-by-query using scrolling and bulk.
*/ */
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest, BulkIndexByScrollResponse> { static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest, BulkIndexByScrollResponse> {
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client, ThreadPool threadPool, public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client,
UpdateByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener) { ThreadPool threadPool, UpdateByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
super(task, logger, scriptService, client, threadPool, request, request.getSource(), listener); super(task, logger, scriptService, client, threadPool, request, request.getSource(), listener);
} }

View File

@ -34,7 +34,9 @@ import java.util.function.Consumer;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
public abstract class AbstractAsyncBulkIndexByScrollActionScriptTestCase<Request extends AbstractBulkIndexByScrollRequest<Request>, Response extends BulkIndexByScrollResponse> public abstract class AbstractAsyncBulkIndexByScrollActionScriptTestCase<
Request extends AbstractBulkIndexByScrollRequest<Request>,
Response extends BulkIndexByScrollResponse>
extends AbstractAsyncBulkIndexByScrollActionTestCase<Request, Response> { extends AbstractAsyncBulkIndexByScrollActionTestCase<Request, Response> {
protected IndexRequest applyScript(Consumer<Map<String, Object>> scriptBody) { protected IndexRequest applyScript(Consumer<Map<String, Object>> scriptBody) {
IndexRequest index = new IndexRequest("index", "type", "1").source(singletonMap("foo", "bar")); IndexRequest index = new IndexRequest("index", "type", "1").source(singletonMap("foo", "bar"));

View File

@ -25,7 +25,9 @@ import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
public abstract class AbstractAsyncBulkIndexByScrollActionTestCase<Request extends AbstractBulkIndexByScrollRequest<Request>, Response extends BulkIndexByScrollResponse> public abstract class AbstractAsyncBulkIndexByScrollActionTestCase<
Request extends AbstractBulkIndexByScrollRequest<Request>,
Response extends BulkIndexByScrollResponse>
extends ESTestCase { extends ESTestCase {
protected ThreadPool threadPool; protected ThreadPool threadPool;
protected BulkByScrollTask task; protected BulkByScrollTask task;
@ -33,7 +35,7 @@ public abstract class AbstractAsyncBulkIndexByScrollActionTestCase<Request exten
@Before @Before
public void setupForTest() { public void setupForTest() {
threadPool = new ThreadPool(getTestName()); threadPool = new ThreadPool(getTestName());
task = new BulkByScrollTask(1, "test", "test", () -> "test"); task = new BulkByScrollTask(1, "test", "test", "test");
} }
@After @After

View File

@ -32,7 +32,9 @@ import static java.util.Collections.singletonList;
import static java.util.Collections.singletonMap; import static java.util.Collections.singletonMap;
import static org.elasticsearch.common.unit.TimeValue.timeValueMillis; import static org.elasticsearch.common.unit.TimeValue.timeValueMillis;
public abstract class AbstractAsyncBulkIndexbyScrollActionMetadataTestCase<Request extends AbstractBulkIndexByScrollRequest<Request>, Response extends BulkIndexByScrollResponse> public abstract class AbstractAsyncBulkIndexbyScrollActionMetadataTestCase<
Request extends AbstractBulkIndexByScrollRequest<Request>,
Response extends BulkIndexByScrollResponse>
extends AbstractAsyncBulkIndexByScrollActionTestCase<Request, Response> { extends AbstractAsyncBulkIndexByScrollActionTestCase<Request, Response> {
/** /**

View File

@ -25,7 +25,9 @@ import org.hamcrest.TypeSafeMatcher;
import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.equalTo;
public abstract class AbstractBulkIndexByScrollResponseMatcher<Response extends BulkIndexByScrollResponse, Self extends AbstractBulkIndexByScrollResponseMatcher<Response, Self>> public abstract class AbstractBulkIndexByScrollResponseMatcher<
Response extends BulkIndexByScrollResponse,
Self extends AbstractBulkIndexByScrollResponseMatcher<Response, Self>>
extends TypeSafeMatcher<Response> { extends TypeSafeMatcher<Response> {
private Matcher<Long> updatedMatcher = equalTo(0L); private Matcher<Long> updatedMatcher = equalTo(0L);
/** /**

View File

@ -99,7 +99,7 @@ public class AsyncBulkByScrollActionTests extends ESTestCase {
firstSearchRequest = null; firstSearchRequest = null;
listener = new PlainActionFuture<>(); listener = new PlainActionFuture<>();
scrollId = null; scrollId = null;
task = new BulkByScrollTask(0, "test", "test", () -> "test"); task = new BulkByScrollTask(0, "test", "test", "test");
} }
@After @After

View File

@ -27,7 +27,7 @@ public class BulkByScrollTaskTests extends ESTestCase {
@Before @Before
public void createTask() { public void createTask() {
task = new BulkByScrollTask(1, "test_type", "test_action", () -> "test"); task = new BulkByScrollTask(1, "test_type", "test_action", "test");
} }
public void testBasicData() { public void testBasicData() {

View File

@ -77,7 +77,8 @@ public class UpdateByQueryWhileModifyingTests extends UpdateByQueryTestCase {
break; break;
} catch (VersionConflictEngineException e) { } catch (VersionConflictEngineException e) {
if (attempts >= MAX_ATTEMPTS) { if (attempts >= MAX_ATTEMPTS) {
throw new RuntimeException("Failed to index after [" + MAX_ATTEMPTS + "] attempts. Too many version conflicts!"); throw new RuntimeException(
"Failed to index after [" + MAX_ATTEMPTS + "] attempts. Too many version conflicts!");
} }
logger.info( logger.info(
"Caught expected version conflict trying to perform mutation number {} with version {}. Retrying.", "Caught expected version conflict trying to perform mutation number {} with version {}. Retrying.",