Fix reindex to work with master branch

Stuff changed, reindex's got to change.
This commit is contained in:
Nik Everett 2016-02-29 10:04:16 -05:00
parent c38119bae9
commit d587a74533
5 changed files with 66 additions and 24 deletions

View File

@ -223,6 +223,13 @@ public class IndexRequest extends ReplicationRequest<IndexRequest> implements Do
return validationException;
}
/**
* The content type that will be used when generating a document from user provided objects like Maps.
*/
public XContentType getContentType() {
return contentType;
}
/**
* Sets the content type that will be used when generating a document from user provided objects (like Map).
*/

View File

@ -32,6 +32,7 @@ import org.elasticsearch.rest.BytesRestResponse;
import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.RestStatus;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import org.elasticsearch.tasks.LoggingTaskListener;
import org.elasticsearch.tasks.Task;
@ -40,13 +41,15 @@ import java.io.IOException;
public abstract class AbstractBaseReindexRestHandler<Request extends ActionRequest<Request>, Response extends BulkIndexByScrollResponse,
TA extends TransportAction<Request, Response>> extends BaseRestHandler {
protected final IndicesQueriesRegistry indicesQueriesRegistry;
protected final AggregatorParsers aggParsers;
private final ClusterService clusterService;
private final TA action;
protected AbstractBaseReindexRestHandler(Settings settings, Client client,
IndicesQueriesRegistry indicesQueriesRegistry, ClusterService clusterService, TA action) {
IndicesQueriesRegistry indicesQueriesRegistry, AggregatorParsers aggParsers, ClusterService clusterService, TA action) {
super(settings, client);
this.indicesQueriesRegistry = indicesQueriesRegistry;
this.aggParsers = aggParsers;
this.clusterService = clusterService;
this.action = action;
}

View File

@ -42,6 +42,7 @@ import org.elasticsearch.rest.RestChannel;
import org.elasticsearch.rest.RestController;
import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import java.io.IOException;
import java.util.List;
@ -55,9 +56,9 @@ import static org.elasticsearch.rest.RestStatus.BAD_REQUEST;
* Expose IndexBySearchRequest over rest.
*/
public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexRequest, ReindexResponse, TransportReindexAction> {
private static final ObjectParser<ReindexRequest, QueryParseContext> PARSER = new ObjectParser<>("reindex");
private static final ObjectParser<ReindexRequest, ReindexParseContext> PARSER = new ObjectParser<>("reindex");
static {
ObjectParser.Parser<SearchRequest, QueryParseContext> sourceParser = (parser, search, context) -> {
ObjectParser.Parser<SearchRequest, ReindexParseContext> sourceParser = (parser, search, context) -> {
/*
* Extract the parameters that we need from the parser. We could do
* away with this hack when search source has an ObjectParser.
@ -74,8 +75,8 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
XContentBuilder builder = XContentFactory.contentBuilder(parser.contentType());
builder.map(source);
parser = parser.contentType().xContent().createParser(builder.bytes());
context.reset(parser);
search.source().parseXContent(parser, context);
context.queryParseContext.reset(parser);
search.source().parseXContent(parser, context.queryParseContext, context.aggParsers);
};
ObjectParser<IndexRequest, Void> destParser = new ObjectParser<>("dest");
@ -93,14 +94,16 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
PARSER.declareField((p, v, c) -> sourceParser.parse(p, v.getSource(), c), new ParseField("source"), ValueType.OBJECT);
PARSER.declareField((p, v, c) -> destParser.parse(p, v.getDestination(), null), new ParseField("dest"), ValueType.OBJECT);
PARSER.declareInt(ReindexRequest::setSize, new ParseField("size"));
PARSER.declareField((p, v, c) -> v.setScript(Script.parse(p, c.parseFieldMatcher())), new ParseField("script"), ValueType.OBJECT);
PARSER.declareField((p, v, c) -> v.setScript(Script.parse(p, c.queryParseContext.parseFieldMatcher())), new ParseField("script"),
ValueType.OBJECT);
PARSER.declareString(ReindexRequest::setConflicts, new ParseField("conflicts"));
}
@Inject
public RestReindexAction(Settings settings, RestController controller, Client client,
IndicesQueriesRegistry indicesQueriesRegistry, ClusterService clusterService, TransportReindexAction action) {
super(settings, client, indicesQueriesRegistry, clusterService, action);
IndicesQueriesRegistry indicesQueriesRegistry, AggregatorParsers aggParsers, ClusterService clusterService,
TransportReindexAction action) {
super(settings, client, indicesQueriesRegistry, aggParsers, clusterService, action);
controller.registerHandler(POST, "/_reindex", this);
}
@ -114,7 +117,7 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
ReindexRequest internalRequest = new ReindexRequest(new SearchRequest(), new IndexRequest());
try (XContentParser xcontent = XContentFactory.xContent(request.content()).createParser(request.content())) {
PARSER.parse(xcontent, internalRequest, new QueryParseContext(indicesQueriesRegistry));
PARSER.parse(xcontent, internalRequest, new ReindexParseContext(new QueryParseContext(indicesQueriesRegistry), aggParsers));
} catch (ParsingException e) {
logger.warn("Bad request", e);
badRequest(channel, e.getDetailedMessage());
@ -162,4 +165,14 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
throw new IllegalArgumentException("Expected [" + name + "] to be a list of a string but was [" + value + ']');
}
}
private class ReindexParseContext {
private final QueryParseContext queryParseContext;
private final AggregatorParsers aggParsers;
public ReindexParseContext(QueryParseContext queryParseContext, AggregatorParsers aggParsers) {
this.queryParseContext = queryParseContext;
this.aggParsers = aggParsers;
}
}
}

View File

@ -37,6 +37,7 @@ import org.elasticsearch.rest.RestRequest;
import org.elasticsearch.rest.action.search.RestSearchAction;
import org.elasticsearch.rest.action.support.RestActions;
import org.elasticsearch.script.Script;
import org.elasticsearch.search.aggregations.AggregatorParsers;
import java.util.Map;
@ -48,9 +49,9 @@ public class RestUpdateByQueryAction extends
AbstractBaseReindexRestHandler<UpdateByQueryRequest, BulkIndexByScrollResponse, TransportUpdateByQueryAction> {
@Inject
public RestUpdateByQueryAction(Settings settings, RestController controller, Client client,
IndicesQueriesRegistry indicesQueriesRegistry, ClusterService clusterService,
IndicesQueriesRegistry indicesQueriesRegistry, AggregatorParsers aggParsers, ClusterService clusterService,
TransportUpdateByQueryAction action) {
super(settings, client, indicesQueriesRegistry, clusterService, action);
super(settings, client, indicesQueriesRegistry, aggParsers, clusterService, action);
controller.registerHandler(POST, "/{index}/_update_by_query", this);
controller.registerHandler(POST, "/{index}/{type}/_update_by_query", this);
}
@ -95,7 +96,7 @@ public class RestUpdateByQueryAction extends
}
}
RestSearchAction.parseSearchRequest(internalRequest.getSource(), indicesQueriesRegistry, request,
parseFieldMatcher, bodyContent);
parseFieldMatcher, aggParsers, bodyContent);
String conflicts = request.param("conflicts");
if (conflicts != null) {

View File

@ -122,26 +122,44 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
@Override
protected IndexRequest buildIndexRequest(SearchHit doc) {
IndexRequest index = new IndexRequest(mainRequest.getDestination());
IndexRequest index = new IndexRequest();
// We want the index from the copied request, not the doc.
index.id(doc.id());
if (index.type() == null) {
/*
* Default to doc's type if not specified in request so its easy
* to do a scripted update.
*/
// Copy the index from the request so we always write where it asked to write
index.index(mainRequest.getDestination().index());
// If the request override's type then the user wants all documents in that type. Otherwise keep the doc's type.
if (mainRequest.getDestination().type() == null) {
index.type(doc.type());
} else {
index.type(mainRequest.getDestination().type());
}
index.source(doc.sourceRef());
/*
* Internal versioning can just use what we copied from the
* destionation request. Otherwise we assume we're using external
* Internal versioning can just use what we copied from the destination request. Otherwise we assume we're using external
* versioning and use the doc's version.
*/
if (index.versionType() != INTERNAL) {
index.versionType(mainRequest.getDestination().versionType());
if (index.versionType() == INTERNAL) {
index.version(mainRequest.getDestination().version());
} else {
index.version(doc.version());
}
// id and source always come from the found doc. Scripts can change them but they operate on the index request.
index.id(doc.id());
index.source(doc.sourceRef());
/*
* The rest of the index request just has to be copied from the template. It may be changed later from scripts or the superclass
* here on out operates on the index request rather than the template.
*/
index.routing(mainRequest.getDestination().routing());
index.parent(mainRequest.getDestination().parent());
index.timestamp(mainRequest.getDestination().timestamp());
index.ttl(mainRequest.getDestination().ttl());
index.contentType(mainRequest.getDestination().getContentType());
// OpType is synthesized from version so it is handled when we copy version above.
return index;
}