From 7aaf7e178223448b0c6adf26097bcc9e0416bf78 Mon Sep 17 00:00:00 2001 From: XAVIER VANDEMEULEBROUCKE Date: Mon, 27 Apr 2015 16:14:49 +0200 Subject: [PATCH] DATAES-158 - DATAES-159 --- .../core/ElasticsearchOperations.java | 17 +++-- .../core/ElasticsearchTemplate.java | 76 ++++++++++++++----- .../core/ElasticsearchTemplateTests.java | 52 +++++++++++-- 3 files changed, 111 insertions(+), 34 deletions(-) diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java index 9ec2067e0..83a6d4ac6 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchOperations.java @@ -15,16 +15,16 @@ */ package org.springframework.data.elasticsearch.core; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Set; - import org.elasticsearch.action.update.UpdateResponse; import org.springframework.data.domain.Page; import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; import org.springframework.data.elasticsearch.core.query.*; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; + /** * ElasticsearchOperations * @@ -323,6 +323,13 @@ public interface ElasticsearchOperations { */ void bulkIndex(List queries); + /** + * Bulk update all objects. Will do update + * + * @param queries + */ + void bulkUpdate(List queries); + /** * Delete the one object with provided id * diff --git a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java index 86aed7dba..6d2834dcd 100755 --- a/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java +++ b/src/main/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplate.java @@ -15,21 +15,6 @@ */ package org.springframework.data.elasticsearch.core; -import static org.apache.commons.collections.CollectionUtils.isNotEmpty; -import static org.apache.commons.lang.StringUtils.*; -import static org.elasticsearch.action.search.SearchType.*; -import static org.elasticsearch.client.Requests.*; -import static org.elasticsearch.cluster.metadata.AliasAction.Type.*; -import static org.elasticsearch.common.collect.Sets.*; -import static org.elasticsearch.index.VersionType.*; -import static org.springframework.data.elasticsearch.core.MappingBuilder.*; - -import java.io.BufferedReader; -import java.io.IOException; -import java.io.InputStreamReader; -import java.lang.reflect.Method; -import java.util.*; - import org.apache.commons.collections.CollectionUtils; import org.elasticsearch.action.ListenableActionFuture; import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest; @@ -95,6 +80,23 @@ import org.springframework.data.elasticsearch.core.query.*; import org.springframework.data.mapping.PersistentProperty; import org.springframework.util.Assert; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.Method; +import java.util.*; + +import static org.apache.commons.collections.CollectionUtils.isNotEmpty; +import static org.apache.commons.lang.StringUtils.isBlank; +import static org.apache.commons.lang.StringUtils.isNotBlank; +import static org.elasticsearch.action.search.SearchType.SCAN; +import static org.elasticsearch.client.Requests.indicesExistsRequest; +import static org.elasticsearch.client.Requests.refreshRequest; +import static org.elasticsearch.cluster.metadata.AliasAction.Type.ADD; +import static org.elasticsearch.common.collect.Sets.newHashSet; +import static org.elasticsearch.index.VersionType.EXTERNAL; +import static org.springframework.data.elasticsearch.core.MappingBuilder.buildMapping; + /** * ElasticsearchTemplate * @@ -447,6 +449,10 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, Applicati @Override public UpdateResponse update(UpdateQuery query) { + return this.prepareUpdate(query).execute().actionGet(); + } + + private UpdateRequestBuilder prepareUpdate(UpdateQuery query) { String indexName = isNotBlank(query.getIndexName()) ? query.getIndexName() : getPersistentEntityFor(query.getClazz()).getIndexName(); String type = isNotBlank(query.getType()) ? query.getType() : getPersistentEntityFor(query.getClazz()).getIndexType(); Assert.notNull(indexName, "No index defined for Query"); @@ -454,16 +460,24 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, Applicati Assert.notNull(query.getId(), "No Id define for Query"); Assert.notNull(query.getUpdateRequest(), "No IndexRequest define for Query"); UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate(indexName, type, query.getId()); - if (query.DoUpsert()) { - updateRequestBuilder.setDocAsUpsert(true) - .setDoc(query.getUpdateRequest().doc()) + + if(query.getUpdateRequest().script() == null) { + // doc + if (query.DoUpsert()) { + updateRequestBuilder.setDocAsUpsert(true) + .setDoc(query.getUpdateRequest().doc()); + } else { + updateRequestBuilder.setDoc(query.getUpdateRequest().doc()); + } + } else { + // or script + updateRequestBuilder .setScript(query.getUpdateRequest().script(), query.getUpdateRequest().scriptType()) .setScriptParams(query.getUpdateRequest().scriptParams()) .setScriptLang(query.getUpdateRequest().scriptLang()); - } else { - updateRequestBuilder.setDoc(query.getUpdateRequest().doc()); } - return updateRequestBuilder.execute().actionGet(); + + return updateRequestBuilder; } @Override @@ -486,6 +500,26 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, Applicati } } + @Override + public void bulkUpdate(List queries) { + BulkRequestBuilder bulkRequest = client.prepareBulk(); + for (UpdateQuery query : queries) { + bulkRequest.add(prepareUpdate(query)); + } + BulkResponse bulkResponse = bulkRequest.execute().actionGet(); + if (bulkResponse.hasFailures()) { + Map failedDocuments = new HashMap(); + for (BulkItemResponse item : bulkResponse.getItems()) { + if (item.isFailed()) + failedDocuments.put(item.getId(), item.getFailureMessage()); + } + throw new ElasticsearchException( + "Bulk indexing has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [" + + failedDocuments + "]", failedDocuments + ); + } + } + @Override public boolean indexExists(Class clazz) { return indexExists(getPersistentEntityFor(clazz).getIndexName()); diff --git a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java index f214c0eff..dcbaf9dc8 100755 --- a/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java +++ b/src/test/java/org/springframework/data/elasticsearch/core/ElasticsearchTemplateTests.java @@ -15,14 +15,6 @@ */ package org.springframework.data.elasticsearch.core; -import static org.apache.commons.lang.RandomStringUtils.*; -import static org.elasticsearch.index.query.FilterBuilders.*; -import static org.elasticsearch.index.query.QueryBuilders.*; -import static org.hamcrest.Matchers.*; -import static org.junit.Assert.*; - -import java.util.*; - import org.apache.commons.lang.StringUtils; import org.elasticsearch.action.get.MultiGetItemResponse; import org.elasticsearch.action.get.MultiGetResponse; @@ -53,6 +45,15 @@ import org.springframework.data.elasticsearch.entities.SampleMappingEntity; import org.springframework.test.context.ContextConfiguration; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; +import java.util.*; + +import static org.apache.commons.lang.RandomStringUtils.randomNumeric; +import static org.elasticsearch.index.query.FilterBuilders.boolFilter; +import static org.elasticsearch.index.query.FilterBuilders.termFilter; +import static org.elasticsearch.index.query.QueryBuilders.*; +import static org.hamcrest.Matchers.*; +import static org.junit.Assert.*; + /** * @author Rizwan Idrees * @author Mohsin Husen @@ -254,6 +255,41 @@ public class ElasticsearchTemplateTests { assertThat(sampleEntities.getTotalElements(), is(equalTo(2L))); } + + @Test + public void shouldDoBulkUpdate() { + //given + String documentId = randomNumeric(5); + String messageBeforeUpdate = "some test message"; + String messageAfterUpdate = "test message"; + + SampleEntity sampleEntity = new SampleEntityBuilder(documentId) + .message(messageBeforeUpdate) + .version(System.currentTimeMillis()).build(); + + IndexQuery indexQuery = getIndexQuery(sampleEntity); + + elasticsearchTemplate.index(indexQuery); + elasticsearchTemplate.refresh(SampleEntity.class, true); + + IndexRequest indexRequest = new IndexRequest(); + indexRequest.source("message", messageAfterUpdate); + UpdateQuery updateQuery = new UpdateQueryBuilder().withId(documentId) + .withClass(SampleEntity.class).withIndexRequest(indexRequest).build(); + + + List queries = new ArrayList(); + queries.add(updateQuery); + + // when + elasticsearchTemplate.bulkUpdate(queries); + //then + GetQuery getQuery = new GetQuery(); + getQuery.setId(documentId); + SampleEntity indexedEntity = elasticsearchTemplate.queryForObject(getQuery, SampleEntity.class); + assertThat(indexedEntity.getMessage(), is(messageAfterUpdate)); + } + @Test public void shouldDeleteDocumentForGivenId() { // given