Reindex: rename source to searchRequest
This makes the code easier to read for those familiar with the Elasticsearch code base.
This commit is contained in:
parent
e17d8bda93
commit
aeed7ee218
|
@ -46,7 +46,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
|
||||||
/**
|
/**
|
||||||
* The search to be executed.
|
* The search to be executed.
|
||||||
*/
|
*/
|
||||||
private SearchRequest source;
|
private SearchRequest searchRequest;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Maximum number of processed documents. Defaults to -1 meaning process all
|
* Maximum number of processed documents. Defaults to -1 meaning process all
|
||||||
|
@ -89,7 +89,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
|
||||||
}
|
}
|
||||||
|
|
||||||
public AbstractBulkByScrollRequest(SearchRequest source) {
|
public AbstractBulkByScrollRequest(SearchRequest source) {
|
||||||
this.source = source;
|
this.searchRequest = source;
|
||||||
|
|
||||||
// Set the defaults which differ from SearchRequest's defaults.
|
// Set the defaults which differ from SearchRequest's defaults.
|
||||||
source.scroll(DEFAULT_SCROLL_TIMEOUT);
|
source.scroll(DEFAULT_SCROLL_TIMEOUT);
|
||||||
|
@ -106,8 +106,8 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public ActionRequestValidationException validate() {
|
public ActionRequestValidationException validate() {
|
||||||
ActionRequestValidationException e = source.validate();
|
ActionRequestValidationException e = searchRequest.validate();
|
||||||
if (source.source().from() != -1) {
|
if (searchRequest.source().from() != -1) {
|
||||||
e = addValidationError("from is not supported in this context", e);
|
e = addValidationError("from is not supported in this context", e);
|
||||||
}
|
}
|
||||||
if (maxRetries < 0) {
|
if (maxRetries < 0) {
|
||||||
|
@ -173,8 +173,8 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
|
||||||
/**
|
/**
|
||||||
* The search request that matches the documents to process.
|
* The search request that matches the documents to process.
|
||||||
*/
|
*/
|
||||||
public SearchRequest getSource() {
|
public SearchRequest getSearchRequest() {
|
||||||
return source;
|
return searchRequest;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -260,8 +260,8 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
|
||||||
@Override
|
@Override
|
||||||
public void readFrom(StreamInput in) throws IOException {
|
public void readFrom(StreamInput in) throws IOException {
|
||||||
super.readFrom(in);
|
super.readFrom(in);
|
||||||
source = new SearchRequest();
|
searchRequest = new SearchRequest();
|
||||||
source.readFrom(in);
|
searchRequest.readFrom(in);
|
||||||
abortOnVersionConflict = in.readBoolean();
|
abortOnVersionConflict = in.readBoolean();
|
||||||
size = in.readVInt();
|
size = in.readVInt();
|
||||||
refresh = in.readBoolean();
|
refresh = in.readBoolean();
|
||||||
|
@ -274,7 +274,7 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
|
||||||
@Override
|
@Override
|
||||||
public void writeTo(StreamOutput out) throws IOException {
|
public void writeTo(StreamOutput out) throws IOException {
|
||||||
super.writeTo(out);
|
super.writeTo(out);
|
||||||
source.writeTo(out);
|
searchRequest.writeTo(out);
|
||||||
out.writeBoolean(abortOnVersionConflict);
|
out.writeBoolean(abortOnVersionConflict);
|
||||||
out.writeVInt(size);
|
out.writeVInt(size);
|
||||||
out.writeBoolean(refresh);
|
out.writeBoolean(refresh);
|
||||||
|
@ -289,13 +289,13 @@ public abstract class AbstractBulkByScrollRequest<Self extends AbstractBulkByScr
|
||||||
* to make toString.
|
* to make toString.
|
||||||
*/
|
*/
|
||||||
protected void searchToString(StringBuilder b) {
|
protected void searchToString(StringBuilder b) {
|
||||||
if (source.indices() != null && source.indices().length != 0) {
|
if (searchRequest.indices() != null && searchRequest.indices().length != 0) {
|
||||||
b.append(Arrays.toString(source.indices()));
|
b.append(Arrays.toString(searchRequest.indices()));
|
||||||
} else {
|
} else {
|
||||||
b.append("[all indices]");
|
b.append("[all indices]");
|
||||||
}
|
}
|
||||||
if (source.types() != null && source.types().length != 0) {
|
if (searchRequest.types() != null && searchRequest.types().length != 0) {
|
||||||
b.append(Arrays.toString(source.types()));
|
b.append(Arrays.toString(searchRequest.types()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -53,7 +53,7 @@ public class ReindexRequest extends AbstractBulkIndexByScrollRequest<ReindexRequ
|
||||||
@Override
|
@Override
|
||||||
public ActionRequestValidationException validate() {
|
public ActionRequestValidationException validate() {
|
||||||
ActionRequestValidationException e = super.validate();
|
ActionRequestValidationException e = super.validate();
|
||||||
if (getSource().indices() == null || getSource().indices().length == 0) {
|
if (getSearchRequest().indices() == null || getSearchRequest().indices().length == 0) {
|
||||||
e = addValidationError("use _all if you really want to copy from all existing indexes", e);
|
e = addValidationError("use _all if you really want to copy from all existing indexes", e);
|
||||||
}
|
}
|
||||||
/*
|
/*
|
||||||
|
|
|
@ -91,7 +91,7 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
|
||||||
destParser.declareString((i, ttl) -> i.ttl(parseTimeValue(ttl, TimeValue.timeValueMillis(-1), "ttl").millis()),
|
destParser.declareString((i, ttl) -> i.ttl(parseTimeValue(ttl, TimeValue.timeValueMillis(-1), "ttl").millis()),
|
||||||
new ParseField("ttl"));
|
new ParseField("ttl"));
|
||||||
|
|
||||||
PARSER.declareField((p, v, c) -> sourceParser.parse(p, v.getSource(), c), new ParseField("source"), ValueType.OBJECT);
|
PARSER.declareField((p, v, c) -> sourceParser.parse(p, v.getSearchRequest(), c), new ParseField("source"), ValueType.OBJECT);
|
||||||
PARSER.declareField((p, v, c) -> destParser.parse(p, v.getDestination(), null), new ParseField("dest"), ValueType.OBJECT);
|
PARSER.declareField((p, v, c) -> destParser.parse(p, v.getDestination(), null), new ParseField("dest"), ValueType.OBJECT);
|
||||||
PARSER.declareInt(ReindexRequest::setSize, new ParseField("size"));
|
PARSER.declareInt(ReindexRequest::setSize, new ParseField("size"));
|
||||||
PARSER.declareField((p, v, c) -> v.setScript(Script.parse(p, c.queryParseContext.parseFieldMatcher())), new ParseField("script"),
|
PARSER.declareField((p, v, c) -> v.setScript(Script.parse(p, c.queryParseContext.parseFieldMatcher())), new ParseField("script"),
|
||||||
|
|
|
@ -64,8 +64,8 @@ public class RestUpdateByQueryAction extends
|
||||||
* defaults. Then the parse can override them.
|
* defaults. Then the parse can override them.
|
||||||
*/
|
*/
|
||||||
UpdateByQueryRequest internalRequest = new UpdateByQueryRequest(new SearchRequest());
|
UpdateByQueryRequest internalRequest = new UpdateByQueryRequest(new SearchRequest());
|
||||||
int scrollSize = internalRequest.getSource().source().size();
|
int scrollSize = internalRequest.getSearchRequest().source().size();
|
||||||
internalRequest.getSource().source().size(SIZE_ALL_MATCHES);
|
internalRequest.getSearchRequest().source().size(SIZE_ALL_MATCHES);
|
||||||
/*
|
/*
|
||||||
* We can't send parseSearchRequest REST content that it doesn't support
|
* We can't send parseSearchRequest REST content that it doesn't support
|
||||||
* so we will have to remove the content that is valid in addition to
|
* so we will have to remove the content that is valid in addition to
|
||||||
|
@ -95,7 +95,7 @@ public class RestUpdateByQueryAction extends
|
||||||
bodyContent = builder.bytes();
|
bodyContent = builder.bytes();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
RestSearchAction.parseSearchRequest(internalRequest.getSource(), indicesQueriesRegistry, request,
|
RestSearchAction.parseSearchRequest(internalRequest.getSearchRequest(), indicesQueriesRegistry, request,
|
||||||
parseFieldMatcher, aggParsers, bodyContent);
|
parseFieldMatcher, aggParsers, bodyContent);
|
||||||
|
|
||||||
String conflicts = request.param("conflicts");
|
String conflicts = request.param("conflicts");
|
||||||
|
@ -104,8 +104,8 @@ public class RestUpdateByQueryAction extends
|
||||||
}
|
}
|
||||||
parseCommon(internalRequest, request);
|
parseCommon(internalRequest, request);
|
||||||
|
|
||||||
internalRequest.setSize(internalRequest.getSource().source().size());
|
internalRequest.setSize(internalRequest.getSearchRequest().source().size());
|
||||||
internalRequest.getSource().source().size(request.paramAsInt("scroll_size", scrollSize));
|
internalRequest.getSearchRequest().source().size(request.paramAsInt("scroll_size", scrollSize));
|
||||||
|
|
||||||
execute(request, internalRequest, channel);
|
execute(request, internalRequest, channel);
|
||||||
}
|
}
|
||||||
|
|
|
@ -71,7 +71,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doExecute(Task task, ReindexRequest request, ActionListener<ReindexResponse> listener) {
|
protected void doExecute(Task task, ReindexRequest request, ActionListener<ReindexResponse> listener) {
|
||||||
validateAgainstAliases(request.getSource(), request.getDestination(), indexNameExpressionResolver, autoCreateIndex,
|
validateAgainstAliases(request.getSearchRequest(), request.getDestination(), indexNameExpressionResolver, autoCreateIndex,
|
||||||
clusterService.state());
|
clusterService.state());
|
||||||
new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, scriptService, client, threadPool, request, listener).start();
|
new AsyncIndexBySearchAction((BulkByScrollTask) task, logger, scriptService, client, threadPool, request, listener).start();
|
||||||
}
|
}
|
||||||
|
@ -117,7 +117,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
|
||||||
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<ReindexRequest, ReindexResponse> {
|
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<ReindexRequest, ReindexResponse> {
|
||||||
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client,
|
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client,
|
||||||
ThreadPool threadPool, ReindexRequest request, ActionListener<ReindexResponse> listener) {
|
ThreadPool threadPool, ReindexRequest request, ActionListener<ReindexResponse> listener) {
|
||||||
super(task, logger, scriptService, client, threadPool, request, request.getSource(), listener);
|
super(task, logger, scriptService, client, threadPool, request, request.getSearchRequest(), listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -78,7 +78,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
|
||||||
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest, BulkIndexByScrollResponse> {
|
static class AsyncIndexBySearchAction extends AbstractAsyncBulkIndexByScrollAction<UpdateByQueryRequest, BulkIndexByScrollResponse> {
|
||||||
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client,
|
public AsyncIndexBySearchAction(BulkByScrollTask task, ESLogger logger, ScriptService scriptService, Client client,
|
||||||
ThreadPool threadPool, UpdateByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
|
ThreadPool threadPool, UpdateByQueryRequest request, ActionListener<BulkIndexByScrollResponse> listener) {
|
||||||
super(task, logger, scriptService, client, threadPool, request, request.getSource(), listener);
|
super(task, logger, scriptService, client, threadPool, request, request.getSearchRequest(), listener);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -69,8 +69,8 @@ public class RoundTripTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void randomRequest(AbstractBulkIndexByScrollRequest<?> request) {
|
private void randomRequest(AbstractBulkIndexByScrollRequest<?> request) {
|
||||||
request.getSource().indices("test");
|
request.getSearchRequest().indices("test");
|
||||||
request.getSource().source().size(between(1, 1000));
|
request.getSearchRequest().source().size(between(1, 1000));
|
||||||
request.setSize(random().nextBoolean() ? between(1, Integer.MAX_VALUE) : -1);
|
request.setSize(random().nextBoolean() ? between(1, Integer.MAX_VALUE) : -1);
|
||||||
request.setAbortOnVersionConflict(random().nextBoolean());
|
request.setAbortOnVersionConflict(random().nextBoolean());
|
||||||
request.setRefresh(rarely());
|
request.setRefresh(rarely());
|
||||||
|
@ -81,8 +81,8 @@ public class RoundTripTests extends ESTestCase {
|
||||||
|
|
||||||
private void assertRequestEquals(AbstractBulkIndexByScrollRequest<?> request,
|
private void assertRequestEquals(AbstractBulkIndexByScrollRequest<?> request,
|
||||||
AbstractBulkIndexByScrollRequest<?> tripped) {
|
AbstractBulkIndexByScrollRequest<?> tripped) {
|
||||||
assertArrayEquals(request.getSource().indices(), tripped.getSource().indices());
|
assertArrayEquals(request.getSearchRequest().indices(), tripped.getSearchRequest().indices());
|
||||||
assertEquals(request.getSource().source().size(), tripped.getSource().source().size());
|
assertEquals(request.getSearchRequest().source().size(), tripped.getSearchRequest().source().size());
|
||||||
assertEquals(request.isAbortOnVersionConflict(), tripped.isAbortOnVersionConflict());
|
assertEquals(request.isAbortOnVersionConflict(), tripped.isAbortOnVersionConflict());
|
||||||
assertEquals(request.isRefresh(), tripped.isRefresh());
|
assertEquals(request.isRefresh(), tripped.isRefresh());
|
||||||
assertEquals(request.getTimeout(), tripped.getTimeout());
|
assertEquals(request.getTimeout(), tripped.getTimeout());
|
||||||
|
|
Loading…
Reference in New Issue