[reindex] Add ingest support

This commit is contained in:
Nik Everett 2016-03-03 10:53:34 -05:00
parent 8ee17d0a86
commit 4d6cb34417
8 changed files with 118 additions and 2 deletions

View File

@ -60,8 +60,8 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
static { static {
ObjectParser.Parser<SearchRequest, ReindexParseContext> sourceParser = (parser, search, context) -> { ObjectParser.Parser<SearchRequest, ReindexParseContext> sourceParser = (parser, search, context) -> {
/* /*
* Extract the parameters that we need from the parser. We could do * Extract the parameters that we need from the source sent to the parser. We could do away with this hack when search source
* away with this hack when search source has an ObjectParser. * has an ObjectParser.
*/ */
Map<String, Object> source = parser.map(); Map<String, Object> source = parser.map();
String[] indices = extractStringArray(source, "index"); String[] indices = extractStringArray(source, "index");
@ -84,6 +84,7 @@ public class RestReindexAction extends AbstractBaseReindexRestHandler<ReindexReq
destParser.declareString(IndexRequest::type, new ParseField("type")); destParser.declareString(IndexRequest::type, new ParseField("type"));
destParser.declareString(IndexRequest::routing, new ParseField("routing")); destParser.declareString(IndexRequest::routing, new ParseField("routing"));
destParser.declareString(IndexRequest::opType, new ParseField("opType")); destParser.declareString(IndexRequest::opType, new ParseField("opType"));
destParser.declareString(IndexRequest::setPipeline, new ParseField("pipeline"));
destParser.declareString((s, i) -> s.versionType(VersionType.fromString(i)), new ParseField("versionType")); destParser.declareString((s, i) -> s.versionType(VersionType.fromString(i)), new ParseField("versionType"));
// These exist just so the user can get a nice validation error: // These exist just so the user can get a nice validation error:

View File

@ -105,8 +105,10 @@ public class RestUpdateByQueryAction extends
parseCommon(internalRequest, request); parseCommon(internalRequest, request);
internalRequest.setSize(internalRequest.getSearchRequest().source().size()); internalRequest.setSize(internalRequest.getSearchRequest().source().size());
internalRequest.setPipeline(request.param("pipeline"));
internalRequest.getSearchRequest().source().size(request.paramAsInt("scroll_size", scrollSize)); internalRequest.getSearchRequest().source().size(request.paramAsInt("scroll_size", scrollSize));
execute(request, internalRequest, channel); execute(request, internalRequest, channel);
} }
} }

View File

@ -158,6 +158,7 @@ public class TransportReindexAction extends HandledTransportAction<ReindexReques
index.timestamp(mainRequest.getDestination().timestamp()); index.timestamp(mainRequest.getDestination().timestamp());
index.ttl(mainRequest.getDestination().ttl()); index.ttl(mainRequest.getDestination().ttl());
index.contentType(mainRequest.getDestination().getContentType()); index.contentType(mainRequest.getDestination().getContentType());
index.setPipeline(mainRequest.getDestination().getPipeline());
// OpType is synthesized from version so it is handled when we copy version above. // OpType is synthesized from version so it is handled when we copy version above.
return index; return index;

View File

@ -90,6 +90,7 @@ public class TransportUpdateByQueryAction extends HandledTransportAction<UpdateB
index.source(doc.sourceRef()); index.source(doc.sourceRef());
index.versionType(VersionType.INTERNAL); index.versionType(VersionType.INTERNAL);
index.version(doc.version()); index.version(doc.version());
index.setPipeline(mainRequest.getPipeline());
return index; return index;
} }

View File

@ -26,6 +26,11 @@ import org.elasticsearch.action.search.SearchRequest;
* locations or IDs. * locations or IDs.
*/ */
public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest<UpdateByQueryRequest> { public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest<UpdateByQueryRequest> {
/**
* Ingest pipeline to set on index requests made by this action.
*/
private String pipeline;
public UpdateByQueryRequest() { public UpdateByQueryRequest() {
} }
@ -33,6 +38,20 @@ public class UpdateByQueryRequest extends AbstractBulkIndexByScrollRequest<Updat
super(search); super(search);
} }
/**
* Set the ingest pipeline to set on index requests made by this action.
*/
public void setPipeline(String pipeline) {
this.pipeline = pipeline;
}
/**
* Ingest pipeline to set on index requests made by this action.
*/
public String getPipeline() {
return pipeline;
}
@Override @Override
protected UpdateByQueryRequest self() { protected UpdateByQueryRequest self() {
return this; return this;

View File

@ -0,0 +1,46 @@
---
"Modify a document":
- do:
ingest.put_pipeline:
id: "test_ingest"
body: >
{
"description": "tests reindex with ingest",
"processors": [
{
"append" : {
"field" : "new_field",
"value": "cat"
}
}
]
}
- do:
index:
index: twitter
type: tweet
id: 1
body: { "user": "kimchy" }
- do:
indices.refresh: {}
- do:
reindex:
refresh: true
body:
source:
index: twitter
dest:
index: new_twitter
pipeline: test_ingest
- match: {created: 1}
- match: {noops: 0}
- do:
search:
index: new_twitter
body:
query:
match:
new_field: cat
- match: { hits.total: 1 }

View File

@ -0,0 +1,42 @@
---
"Update a document using update-by-query":
- do:
ingest.put_pipeline:
id: "test_ingest"
body: >
{
"description": "tests reindex with ingest",
"processors": [
{
"append" : {
"field" : "new_field",
"value": "cat"
}
}
]
}
- do:
index:
index: twitter
type: tweet
id: 1
body: { "user": "kimchy" }
- do:
indices.refresh: {}
- do:
update-by-query:
index: twitter
refresh: true
pipeline: test_ingest
- match: {updated: 1}
- match: {noops: 0}
- do:
search:
index: twitter
body:
query:
match:
new_field: cat
- match: { hits.total: 1 }

View File

@ -80,6 +80,10 @@
"type" : "boolean", "type" : "boolean",
"description" : "Specify whether query terms should be lowercased" "description" : "Specify whether query terms should be lowercased"
}, },
"pipeline": {
"type" : "string",
"description" : "Ingest pipeline to set on index requests made by this action. (default: none)"
},
"preference": { "preference": {
"type" : "string", "type" : "string",
"description" : "Specify the node or shard the operation should be performed on (default: random)" "description" : "Specify the node or shard the operation should be performed on (default: random)"