DATAES-16 : Add Partial Update / Upsert support

This commit is contained in:
Mohsin Husen 2013-07-30 12:11:59 +01:00
parent 2d6750abdf
commit 096c2e197e
5 changed files with 245 additions and 1 deletions

View File

@ -15,6 +15,7 @@
*/
package org.springframework.data.elasticsearch.core;
import org.elasticsearch.action.update.UpdateResponse;
import org.springframework.data.domain.Page;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.query.*;
@ -158,6 +159,14 @@ public interface ElasticsearchOperations {
*/
String index(IndexQuery query);
/**
* Partial update of the document
*
* @param updateQuery
* @return
*/
UpdateResponse update(UpdateQuery updateQuery);
/**
* Bulk index all objects. Will do save or update
*

View File

@ -29,6 +29,8 @@ import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.mlt.MoreLikeThisRequestBuilder;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.update.UpdateRequestBuilder;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.Client;
import org.elasticsearch.client.Requests;
import org.elasticsearch.common.collect.MapBuilder;
@ -99,7 +101,6 @@ public class ElasticsearchTemplate implements ElasticsearchOperations {
@Override
public <T> boolean createIndex(Class<T> clazz) {
ElasticsearchPersistentEntity<T> persistentEntity = getPersistentEntityFor(clazz);
return createIndexIfNotCreated(clazz);
}
@ -207,6 +208,25 @@ public class ElasticsearchTemplate implements ElasticsearchOperations {
return prepareIndex(query).execute().actionGet().getId();
}
@Override
public UpdateResponse update(UpdateQuery query) {
String indexName = isNotBlank(query.getIndexName()) ? query.getIndexName() : getPersistentEntityFor(query.getClazz()).getIndexName();
String type = isNotBlank(query.getType()) ? query.getType() : getPersistentEntityFor(query.getClazz()).getIndexType();
Assert.notNull(indexName, "No index defined for Query");
Assert.notNull(type, "No type define for Query");
Assert.notNull(query.getId(), "No Id define for Query");
Assert.notNull(query.getIndexRequest(), "No IndexRequest define for Query");
UpdateRequestBuilder updateRequestBuilder = client.prepareUpdate(indexName, type, query.getId());
if(query.DoUpsert()){
updateRequestBuilder.setDocAsUpsert(true)
.setUpsertRequest(query.getIndexRequest()).setDoc(query.getIndexRequest());
} else {
updateRequestBuilder.setDoc(query.getIndexRequest());
}
return updateRequestBuilder.execute().actionGet();
}
@Override
public void bulkIndex(List<IndexQuery> queries) {
BulkRequestBuilder bulkRequest = client.prepareBulk();

View File

@ -0,0 +1,80 @@
/*
* Copyright 2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.query;
import org.elasticsearch.action.index.IndexRequest;
/**
* @author Rizwan Idrees
* @author Mohsin Husen
*/
public class UpdateQuery {
private String id;
private IndexRequest indexRequest;
private String indexName;
private String type;
private Class clazz;
private boolean doUpsert;
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}
public IndexRequest getIndexRequest() {
return indexRequest;
}
public void setIndexRequest(IndexRequest indexRequest) {
this.indexRequest = indexRequest;
}
public String getIndexName() {
return indexName;
}
public void setIndexName(String indexName) {
this.indexName = indexName;
}
public String getType() {
return type;
}
public void setType(String type) {
this.type = type;
}
public Class getClazz() {
return clazz;
}
public void setClazz(Class clazz) {
this.clazz = clazz;
}
public boolean DoUpsert() {
return doUpsert;
}
public void setDoUpsert(boolean doUpsert) {
this.doUpsert = doUpsert;
}
}

View File

@ -0,0 +1,73 @@
/*
* Copyright 2013 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.query;
import org.elasticsearch.action.index.IndexRequest;
/**
* @author Rizwan Idrees
* @author Mohsin Husen
*/
public class UpdateQueryBuilder {
private String id;
private IndexRequest indexRequest;
private String indexName;
private String type;
private Class clazz;
private boolean doUpsert;
public UpdateQueryBuilder withId(String id){
this.id = id;
return this;
}
public UpdateQueryBuilder withIndexRequest(IndexRequest indexRequest){
this.indexRequest = indexRequest;
return this;
}
public UpdateQueryBuilder withIndexName(String indexName){
this.indexName = indexName;
return this;
}
public UpdateQueryBuilder withType(String type){
this.type = type;
return this;
}
public UpdateQueryBuilder withClass(Class clazz){
this.clazz = clazz;
return this;
}
public UpdateQueryBuilder withDoUpsert(boolean doUpsert){
this.doUpsert = doUpsert;
return this;
}
public UpdateQuery build(){
UpdateQuery updateQuery = new UpdateQuery();
updateQuery.setId(id);
updateQuery.setIndexName(indexName);
updateQuery.setType(type);
updateQuery.setClazz(clazz);
updateQuery.setIndexRequest(indexRequest);
updateQuery.setDoUpsert(doUpsert);
return updateQuery;
}
}

View File

@ -15,7 +15,9 @@
*/
package org.springframework.data.elasticsearch.core;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.index.engine.DocumentMissingException;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortOrder;
@ -716,4 +718,64 @@ public class ElasticsearchTemplateTests {
assertThat(elasticsearchTemplate.indexExists(clazz), is(false));
}
@Test
public void shouldDoPartialUpdateForExistingDocument() {
//given
String documentId = randomNumeric(5);
String messageBeforeUpdate = "some test message";
String messageAfterUpdate = "test message";
SampleEntity sampleEntity = new SampleEntity();
sampleEntity.setId(documentId);
sampleEntity.setMessage(messageBeforeUpdate);
sampleEntity.setVersion(System.currentTimeMillis());
IndexQuery indexQuery = new IndexQuery();
indexQuery.setId(documentId);
indexQuery.setObject(sampleEntity);
elasticsearchTemplate.index(indexQuery);
elasticsearchTemplate.refresh(SampleEntity.class, true);
IndexRequest indexRequest = new IndexRequest();
indexRequest.source("message", messageAfterUpdate);
UpdateQuery updateQuery = new UpdateQueryBuilder().withId(documentId)
.withClass(SampleEntity.class).withIndexRequest(indexRequest).build();
// when
elasticsearchTemplate.update(updateQuery);
//then
GetQuery getQuery = new GetQuery();
getQuery.setId(documentId);
SampleEntity indexedEntity = elasticsearchTemplate.queryForObject(getQuery, SampleEntity.class);
assertThat(indexedEntity.getMessage(), is(messageAfterUpdate));
}
@Test(expected = DocumentMissingException.class)
public void shouldThrowExceptionIfDocumentDoesNotExistWhileDoingPartialUpdate(){
// when
IndexRequest indexRequest = new IndexRequest();
UpdateQuery updateQuery = new UpdateQueryBuilder().withId(randomNumeric(5))
.withClass(SampleEntity.class).withIndexRequest(indexRequest).build();
elasticsearchTemplate.update(updateQuery);
}
@Test
public void shouldDoUpsertIfDocumentDoesNotExist(){
//given
String documentId = randomNumeric(5);
String message = "test message";
IndexRequest indexRequest = new IndexRequest();
indexRequest.source("message", message);
UpdateQuery updateQuery = new UpdateQueryBuilder().withId(documentId)
.withDoUpsert(true).withClass(SampleEntity.class)
.withIndexRequest(indexRequest).build();
//when
elasticsearchTemplate.update(updateQuery);
//then
GetQuery getQuery = new GetQuery();
getQuery.setId(documentId);
SampleEntity indexedEntity = elasticsearchTemplate.queryForObject(getQuery, SampleEntity.class);
assertThat(indexedEntity.getMessage(), is(message));
}
}