Added missing metadata fields to upserted documents (parent, routing, ttl, timestamp, version and versionType)

Closes #3444
This commit is contained in:
Luca Cavanna 2013-08-06 12:00:07 +02:00
parent 88a0e4628a
commit 636c35d0d4
2 changed files with 110 additions and 9 deletions

View File

@ -340,9 +340,30 @@ public class BulkRequest extends ActionRequest<BulkRequest> {
.create(true)
.source(data.slice(from, nextMarker - from), contentUnsafe), payload);
} else if ("update".equals(action)) {
internalAdd(new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict)
UpdateRequest updateRequest = new UpdateRequest(index, type, id).routing(routing).parent(parent).retryOnConflict(retryOnConflict)
.version(version).versionType(versionType)
.source(data.slice(from, nextMarker - from)), payload);
.source(data.slice(from, nextMarker - from));
IndexRequest upsertRequest = updateRequest.upsertRequest();
if (upsertRequest != null) {
upsertRequest.routing(routing);
upsertRequest.parent(parent); // order is important, set it after routing, so it will set the routing
upsertRequest.timestamp(timestamp);
upsertRequest.ttl(ttl);
upsertRequest.version(version);
upsertRequest.versionType(versionType);
}
IndexRequest doc = updateRequest.doc();
if (doc != null) {
doc.routing(routing);
doc.parent(parent); // order is important, set it after routing, so it will set the routing
doc.timestamp(timestamp);
doc.ttl(ttl);
doc.version(version);
doc.versionType(versionType);
}
internalAdd(updateRequest, payload);
}
// move pointers
from = nextMarker + 1;

View File

@ -4,19 +4,19 @@ import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.index.VersionType;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.test.integration.AbstractSharedClusterTest;
import org.junit.Test;
import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
*/
@ -25,8 +25,6 @@ public class BulkTests extends AbstractSharedClusterTest {
@Test
public void testBulkUpdate_simple() throws Exception {
client().admin().indices().prepareDelete().execute().actionGet();
client().admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
@ -199,8 +197,6 @@ public class BulkTests extends AbstractSharedClusterTest {
@Test
public void testBulkUpdate_largerVolume() throws Exception {
client().admin().indices().prepareDelete().execute().actionGet();
client().admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
@ -337,4 +333,88 @@ public class BulkTests extends AbstractSharedClusterTest {
}
}
//Test for https://github.com/elasticsearch/elasticsearch/issues/3444
@Test
public void testBulkUpdateDocAsUpsertWithParent() throws Exception {
client().admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
).addMapping("child", "{\"child\": {\"_parent\": {\"type\": \"parent\"}}}")
.execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
BulkRequestBuilder builder = client().prepareBulk();
byte[] addParent = ("{\"index\" : { \"_index\" : \"test\", \"_type\" : \"parent\", \"_id\" : \"parent1\"}}\n" +
"{\"field1\" : \"value1\"}\n").getBytes("utf-8");
byte[] addChild = ("{ \"update\" : { \"_index\" : \"test\", \"_type\" : \"child\", \"_id\" : \"child1\", \"parent\" : \"parent1\"}}\n" +
"{\"doc\" : { \"field1\" : \"value1\"}, \"doc_as_upsert\" : \"true\"}\n").getBytes("utf-8");
builder.add(addParent, 0, addParent.length, false);
builder.add(addChild, 0, addChild.length, false);
BulkResponse bulkResponse = builder.get();
assertThat(bulkResponse.getItems().length, equalTo(2));
assertThat(bulkResponse.getItems()[0].isFailed(), equalTo(false));
assertThat(bulkResponse.getItems()[1].isFailed(), equalTo(false));
client().admin().indices().prepareRefresh("test").get();
//we check that the _parent field was set on the child document by using the has parent query
SearchResponse searchResponse = client().prepareSearch("test")
.setQuery(QueryBuilders.hasParentQuery("parent", QueryBuilders.matchAllQuery()))
.get();
assertThat(searchResponse.getFailedShards(), equalTo(0));
SearchHit[] hits = searchResponse.getHits().getHits();
assertThat(hits.length, equalTo(1));
assertThat(hits[0].getId(), equalTo("child1"));
}
@Test
public void testBulkUpdateUpsertWithParent() throws Exception {
client().admin().indices().prepareCreate("test")
.setSettings(
ImmutableSettings.settingsBuilder()
.put("index.number_of_shards", 2)
.put("index.number_of_replicas", 1)
).addMapping("child", "{\"child\": {\"_parent\": {\"type\": \"parent\"}}}")
.execute().actionGet();
client().admin().cluster().prepareHealth().setWaitForEvents(Priority.LANGUID).setWaitForGreenStatus().execute().actionGet();
BulkRequestBuilder builder = client().prepareBulk();
byte[] addParent = ("{\"index\" : { \"_index\" : \"test\", \"_type\" : \"parent\", \"_id\" : \"parent1\"}}\n" +
"{\"field1\" : \"value1\"}\n").getBytes("utf-8");
byte[] addChild = ("{\"update\" : { \"_id\" : \"child1\", \"_type\" : \"child\", \"_index\" : \"test\", \"parent\" : \"parent1\"} }\n" +
"{ \"script\" : \"ctx._source.field2 = 'value2'\", \"upsert\" : {\"field1\" : \"value1\"}}\n").getBytes("utf-8");
builder.add(addParent, 0, addParent.length, false);
builder.add(addChild, 0, addChild.length, false);
BulkResponse bulkResponse = builder.get();
assertThat(bulkResponse.getItems().length, equalTo(2));
assertThat(bulkResponse.getItems()[0].isFailed(), equalTo(false));
assertThat(bulkResponse.getItems()[1].isFailed(), equalTo(false));
client().admin().indices().prepareRefresh("test").get();
SearchResponse searchResponse = client().prepareSearch("test")
.setQuery(QueryBuilders.hasParentQuery("parent", QueryBuilders.matchAllQuery()))
.get();
assertThat(searchResponse.getFailedShards(), equalTo(0));
SearchHit[] hits = searchResponse.getHits().getHits();
assertThat(hits.length, equalTo(1));
assertThat(hits[0].getId(), equalTo("child1"));
}
}