DATAES-158 - DATAES-159

This commit is contained in:
XAVIER VANDEMEULEBROUCKE 2015-04-27 16:14:49 +02:00 committed by Artur Konczak
parent 455df72923
commit 7aaf7e1782
3 changed files with 111 additions and 34 deletions

View File

@ -15,16 +15,16 @@
*/
package org.springframework.data.elasticsearch.core;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.elasticsearch.action.update.UpdateResponse;
import org.springframework.data.domain.Page;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.query.*;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
/**
* ElasticsearchOperations
*
@ -323,6 +323,13 @@ public interface ElasticsearchOperations {
*/
void bulkIndex(List<IndexQuery> queries);
/**
* Bulk update all objects. Will do update
*
* @param queries
*/
void bulkUpdate(List<UpdateQuery> queries);
/**
* Delete the one object with provided id
*

View File

@ -15,21 +15,6 @@
*/
package org.springframework.data.elasticsearch.core;
import static org.apache.commons.collections.CollectionUtils.isNotEmpty;
import static org.apache.commons.lang.StringUtils.*;
import static org.elasticsearch.action.search.SearchType.*;
import static org.elasticsearch.client.Requests.*;
import static org.elasticsearch.cluster.metadata.AliasAction.Type.*;
import static org.elasticsearch.common.collect.Sets.*;
import static org.elasticsearch.index.VersionType.*;
import static org.springframework.data.elasticsearch.core.MappingBuilder.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Method;
import java.util.*;
import org.apache.commons.collections.CollectionUtils;
import org.elasticsearch.action.ListenableActionFuture;
import org.elasticsearch.action.admin.cluster.state.ClusterStateRequest;
@ -95,6 +80,23 @@ import org.springframework.data.elasticsearch.core.query.*;
import org.springframework.data.mapping.PersistentProperty;
import org.springframework.util.Assert;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.lang.reflect.Method;
import java.util.*;
import static org.apache.commons.collections.CollectionUtils.isNotEmpty;
import static org.apache.commons.lang.StringUtils.isBlank;
import static org.apache.commons.lang.StringUtils.isNotBlank;
import static org.elasticsearch.action.search.SearchType.SCAN;
import static org.elasticsearch.client.Requests.indicesExistsRequest;
import static org.elasticsearch.client.Requests.refreshRequest;
import static org.elasticsearch.cluster.metadata.AliasAction.Type.ADD;
import static org.elasticsearch.common.collect.Sets.newHashSet;
import static org.elasticsearch.index.VersionType.EXTERNAL;
import static org.springframework.data.elasticsearch.core.MappingBuilder.buildMapping;
/**
* ElasticsearchTemplate
*
@ -447,6 +449,10 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, Applicati
@Override
public UpdateResponse update(UpdateQuery query) {
return this.prepareUpdate(query).execute().actionGet();
}
private UpdateRequestBuilder prepareUpdate(UpdateQuery query) {
String indexName = isNotBlank(query.getIndexName()) ? query.getIndexName() : getPersistentEntityFor(query.getClazz()).getIndexName();
String type = isNotBlank(query.getType()) ? query.getType() : getPersistentEntityFor(query.getClazz()).getIndexType();
Assert.notNull(indexName, "No index defined for Query");
@ -454,16 +460,24 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, Applicati
Assert.notNull(query.getId(), "No Id define for Query");
Assert.notNull(query.getUpdateRequest(), "No IndexRequest define for Query");
UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate(indexName, type, query.getId());
if(query.getUpdateRequest().script() == null) {
// doc
if (query.DoUpsert()) {
updateRequestBuilder.setDocAsUpsert(true)
.setDoc(query.getUpdateRequest().doc())
.setScript(query.getUpdateRequest().script(), query.getUpdateRequest().scriptType())
.setScriptParams(query.getUpdateRequest().scriptParams())
.setScriptLang(query.getUpdateRequest().scriptLang());
.setDoc(query.getUpdateRequest().doc());
} else {
updateRequestBuilder.setDoc(query.getUpdateRequest().doc());
}
return updateRequestBuilder.execute().actionGet();
} else {
// or script
updateRequestBuilder
.setScript(query.getUpdateRequest().script(), query.getUpdateRequest().scriptType())
.setScriptParams(query.getUpdateRequest().scriptParams())
.setScriptLang(query.getUpdateRequest().scriptLang());
}
return updateRequestBuilder;
}
@Override
@ -486,6 +500,26 @@ public class ElasticsearchTemplate implements ElasticsearchOperations, Applicati
}
}
@Override
public void bulkUpdate(List<UpdateQuery> queries) {
BulkRequestBuilder bulkRequest = client.prepareBulk();
for (UpdateQuery query : queries) {
bulkRequest.add(prepareUpdate(query));
}
BulkResponse bulkResponse = bulkRequest.execute().actionGet();
if (bulkResponse.hasFailures()) {
Map<String, String> failedDocuments = new HashMap<String, String>();
for (BulkItemResponse item : bulkResponse.getItems()) {
if (item.isFailed())
failedDocuments.put(item.getId(), item.getFailureMessage());
}
throw new ElasticsearchException(
"Bulk indexing has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages ["
+ failedDocuments + "]", failedDocuments
);
}
}
@Override
public <T> boolean indexExists(Class<T> clazz) {
return indexExists(getPersistentEntityFor(clazz).getIndexName());

View File

@ -15,14 +15,6 @@
*/
package org.springframework.data.elasticsearch.core;
import static org.apache.commons.lang.RandomStringUtils.*;
import static org.elasticsearch.index.query.FilterBuilders.*;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.util.*;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
@ -53,6 +45,15 @@ import org.springframework.data.elasticsearch.entities.SampleMappingEntity;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import java.util.*;
import static org.apache.commons.lang.RandomStringUtils.randomNumeric;
import static org.elasticsearch.index.query.FilterBuilders.boolFilter;
import static org.elasticsearch.index.query.FilterBuilders.termFilter;
import static org.elasticsearch.index.query.QueryBuilders.*;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
/**
* @author Rizwan Idrees
* @author Mohsin Husen
@ -254,6 +255,41 @@ public class ElasticsearchTemplateTests {
assertThat(sampleEntities.getTotalElements(), is(equalTo(2L)));
}
@Test
public void shouldDoBulkUpdate() {
//given
String documentId = randomNumeric(5);
String messageBeforeUpdate = "some test message";
String messageAfterUpdate = "test message";
SampleEntity sampleEntity = new SampleEntityBuilder(documentId)
.message(messageBeforeUpdate)
.version(System.currentTimeMillis()).build();
IndexQuery indexQuery = getIndexQuery(sampleEntity);
elasticsearchTemplate.index(indexQuery);
elasticsearchTemplate.refresh(SampleEntity.class, true);
IndexRequest indexRequest = new IndexRequest();
indexRequest.source("message", messageAfterUpdate);
UpdateQuery updateQuery = new UpdateQueryBuilder().withId(documentId)
.withClass(SampleEntity.class).withIndexRequest(indexRequest).build();
List<UpdateQuery> queries = new ArrayList<UpdateQuery>();
queries.add(updateQuery);
// when
elasticsearchTemplate.bulkUpdate(queries);
//then
GetQuery getQuery = new GetQuery();
getQuery.setId(documentId);
SampleEntity indexedEntity = elasticsearchTemplate.queryForObject(getQuery, SampleEntity.class);
assertThat(indexedEntity.getMessage(), is(messageAfterUpdate));
}
@Test
public void shouldDeleteDocumentForGivenId() {
// given