1 | package org.springframework.data.elasticsearch.core; |
2 | |
3 | import org.codehaus.jackson.map.DeserializationConfig; |
4 | import org.codehaus.jackson.map.ObjectMapper; |
5 | import org.elasticsearch.action.bulk.BulkItemResponse; |
6 | import org.elasticsearch.action.bulk.BulkRequestBuilder; |
7 | import org.elasticsearch.action.bulk.BulkResponse; |
8 | import org.elasticsearch.action.count.CountRequestBuilder; |
9 | import org.elasticsearch.action.get.GetResponse; |
10 | import org.elasticsearch.action.index.IndexRequestBuilder; |
11 | import org.elasticsearch.action.search.SearchRequestBuilder; |
12 | import org.elasticsearch.action.search.SearchResponse; |
13 | import org.elasticsearch.client.Client; |
14 | import org.elasticsearch.client.Requests; |
15 | import org.elasticsearch.common.collect.MapBuilder; |
16 | import org.elasticsearch.index.query.QueryBuilder; |
17 | import org.elasticsearch.search.SearchHit; |
18 | import org.elasticsearch.search.sort.SortOrder; |
19 | import org.springframework.data.domain.Page; |
20 | import org.springframework.data.domain.PageImpl; |
21 | import org.springframework.data.domain.Pageable; |
22 | import org.springframework.data.domain.Sort; |
23 | import org.springframework.data.elasticsearch.ElasticsearchException; |
24 | import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter; |
25 | import org.springframework.data.elasticsearch.core.convert.MappingElasticsearchConverter; |
26 | import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity; |
27 | import org.springframework.data.elasticsearch.core.mapping.SimpleElasticsearchMappingContext; |
28 | import org.springframework.data.elasticsearch.core.query.*; |
29 | import org.springframework.util.Assert; |
30 | |
31 | import java.io.IOException; |
32 | import java.util.ArrayList; |
33 | import java.util.HashMap; |
34 | import java.util.List; |
35 | import java.util.Map; |
36 | |
37 | import static org.apache.commons.lang.StringUtils.isBlank; |
38 | import static org.elasticsearch.action.search.SearchType.DFS_QUERY_THEN_FETCH; |
39 | import static org.elasticsearch.client.Requests.indicesExistsRequest; |
40 | import static org.elasticsearch.client.Requests.refreshRequest; |
41 | |
42 | |
43 | public class ElasticsearchTemplate implements ElasticsearchOperations { |
44 | |
45 | private Client client; |
46 | private ElasticsearchConverter elasticsearchConverter; |
47 | |
48 | private ObjectMapper objectMapper = new ObjectMapper(); |
49 | |
50 | { |
51 | objectMapper.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); |
52 | } |
53 | |
54 | public ElasticsearchTemplate(Client client) { |
55 | this(client, null); |
56 | } |
57 | |
58 | public ElasticsearchTemplate(Client client, ElasticsearchConverter elasticsearchConverter) { |
59 | this.client = client; |
60 | this.elasticsearchConverter = (elasticsearchConverter == null)? new MappingElasticsearchConverter(new SimpleElasticsearchMappingContext()) : elasticsearchConverter ; |
61 | } |
62 | |
63 | |
64 | @Override |
65 | public <T> boolean createIndex(Class<T> clazz) { |
66 | ElasticsearchPersistentEntity<T> persistentEntity = getPersistentEntityFor(clazz); |
67 | return createIndexIfNotCreated(persistentEntity.getIndexName()); |
68 | } |
69 | |
70 | @Override |
71 | public ElasticsearchConverter getElasticsearchConverter() { |
72 | return elasticsearchConverter; |
73 | } |
74 | |
75 | @Override |
76 | public <T> T queryForObject(GetQuery query, Class<T> clazz) { |
77 | ElasticsearchPersistentEntity<T> persistentEntity = getPersistentEntityFor(clazz); |
78 | GetResponse response = client.prepareGet(persistentEntity.getIndexName(), persistentEntity.getIndexType(), query.getId()) |
79 | .execute().actionGet(); |
80 | return mapResult(response.getSourceAsString(), clazz); |
81 | } |
82 | |
83 | @Override |
84 | public <T> T queryForObject(CriteriaQuery query, Class<T> clazz) { |
85 | Page<T> page = queryForPage(query,clazz); |
86 | Assert.isTrue(page.getTotalElements() < 2, "Expected 1 but found "+ page.getTotalElements() +" results"); |
87 | return page.getTotalElements() > 0? page.getContent().get(0) : null; |
88 | } |
89 | |
90 | @Override |
91 | public <T> T queryForObject(StringQuery query, Class<T> clazz) { |
92 | Page<T> page = queryForPage(query,clazz); |
93 | Assert.isTrue(page.getTotalElements() < 2, "Expected 1 but found "+ page.getTotalElements() +" results"); |
94 | return page.getTotalElements() > 0? page.getContent().get(0) : null; |
95 | } |
96 | |
97 | @Override |
98 | public <T> Page<T> queryForPage(SearchQuery query, Class<T> clazz) { |
99 | SearchRequestBuilder searchRequestBuilder = prepareSearch(query,clazz); |
100 | if(query.getElasticsearchFilter() != null){ |
101 | searchRequestBuilder.setFilter(query.getElasticsearchFilter()); |
102 | } |
103 | SearchResponse response = searchRequestBuilder.setQuery(query.getElasticsearchQuery()).execute().actionGet(); |
104 | return mapResults(response, clazz, query.getPageable()); |
105 | } |
106 | |
107 | @Override |
108 | public <T> Page<T> queryForPage(CriteriaQuery query, Class<T> clazz) { |
109 | QueryBuilder elasticsearchQuery = new CriteriaQueryProcessor().createQueryFromCriteria(query.getCriteria()); |
110 | SearchResponse response = prepareSearch(query,clazz) |
111 | .setQuery(elasticsearchQuery) |
112 | .execute().actionGet(); |
113 | return mapResults(response, clazz, query.getPageable()); |
114 | } |
115 | |
116 | @Override |
117 | public <T> Page<T> queryForPage(StringQuery query, Class<T> clazz) { |
118 | SearchResponse response = prepareSearch(query,clazz) |
119 | .setQuery(query.getSource()) |
120 | .execute().actionGet(); |
121 | return mapResults(response, clazz, query.getPageable()); |
122 | } |
123 | |
124 | @Override |
125 | public <T> long count(SearchQuery query, Class<T> clazz) { |
126 | ElasticsearchPersistentEntity<T> persistentEntity = getPersistentEntityFor(clazz); |
127 | CountRequestBuilder countRequestBuilder = client.prepareCount(persistentEntity.getIndexName()) |
128 | .setTypes(persistentEntity.getIndexType()); |
129 | if(query.getElasticsearchQuery() != null){ |
130 | countRequestBuilder.setQuery(query.getElasticsearchQuery()); |
131 | } |
132 | return countRequestBuilder.execute().actionGet().count(); |
133 | } |
134 | |
135 | @Override |
136 | public String index(IndexQuery query) { |
137 | return prepareIndex(query) |
138 | .execute() |
139 | .actionGet().getId(); |
140 | } |
141 | |
142 | @Override |
143 | public void bulkIndex(List<IndexQuery> queries) { |
144 | BulkRequestBuilder bulkRequest = client.prepareBulk(); |
145 | for(IndexQuery query : queries){ |
146 | bulkRequest.add(prepareIndex(query)); |
147 | } |
148 | BulkResponse bulkResponse = bulkRequest.execute().actionGet(); |
149 | if (bulkResponse.hasFailures()) { |
150 | Map<String, String> failedDocuments = new HashMap<String, String>(); |
151 | for (BulkItemResponse item : bulkResponse.items()) { |
152 | if (item.failed()) |
153 | failedDocuments.put(item.getId(), item.failureMessage()); |
154 | } |
155 | throw new ElasticsearchException("Bulk indexing has failures. Use ElasticsearchException.getFailedDocuments() for detailed messages [" + failedDocuments+"]", failedDocuments); |
156 | } |
157 | } |
158 | |
159 | @Override |
160 | public String delete(String indexName, String type, String id) { |
161 | return client.prepareDelete(indexName, type, id) |
162 | .execute().actionGet().getId(); |
163 | } |
164 | |
165 | @Override |
166 | public <T> String delete(Class<T> clazz, String id) { |
167 | ElasticsearchPersistentEntity persistentEntity = getPersistentEntityFor(clazz); |
168 | return delete(persistentEntity.getIndexName(), persistentEntity.getIndexType(), id); |
169 | } |
170 | |
171 | @Override |
172 | public <T> void delete(DeleteQuery query, Class<T> clazz) { |
173 | ElasticsearchPersistentEntity persistentEntity = getPersistentEntityFor(clazz); |
174 | client.prepareDeleteByQuery(persistentEntity.getIndexName()) |
175 | .setTypes(persistentEntity.getIndexType()) |
176 | .setQuery(query.getElasticsearchQuery()) |
177 | .execute().actionGet(); |
178 | } |
179 | |
180 | private boolean createIndexIfNotCreated(String indexName) { |
181 | return indexExists(indexName) || createIndex(indexName); |
182 | } |
183 | |
184 | private boolean indexExists(String indexName) { |
185 | return client.admin() |
186 | .indices() |
187 | .exists(indicesExistsRequest(indexName)).actionGet().exists(); |
188 | } |
189 | |
190 | private boolean createIndex(String indexName) { |
191 | return client.admin().indices().create(Requests.createIndexRequest(indexName). |
192 | settings(new MapBuilder<String, String>().put("index.refresh_interval", "-1").map())).actionGet().acknowledged(); |
193 | } |
194 | |
195 | private <T> SearchRequestBuilder prepareSearch(Query query, Class<T> clazz){ |
196 | int startRecord=0; |
197 | if(query.getPageable() != null){ |
198 | startRecord = ((query.getPageable().getPageNumber() - 1) * query.getPageable().getPageSize()); |
199 | } |
200 | ElasticsearchPersistentEntity persistentEntity = getPersistentEntityFor(clazz); |
201 | SearchRequestBuilder searchRequestBuilder = client.prepareSearch(persistentEntity.getIndexName()) |
202 | .setSearchType(DFS_QUERY_THEN_FETCH) |
203 | .setTypes(persistentEntity.getIndexType()) |
204 | .setFrom(startRecord < 0 ? 0 : startRecord) |
205 | .setSize(query.getPageable() != null ? query.getPageable().getPageSize() : 10); |
206 | |
207 | if(query.getSort() != null){ |
208 | for(Sort.Order order : query.getSort()){ |
209 | searchRequestBuilder.addSort(order.getProperty(), order.getDirection() == Sort.Direction.DESC? SortOrder.DESC : SortOrder.ASC); |
210 | } |
211 | } |
212 | return searchRequestBuilder; |
213 | } |
214 | |
215 | private IndexRequestBuilder prepareIndex(IndexQuery query){ |
216 | try { |
217 | ElasticsearchPersistentEntity persistentEntity = getPersistentEntityFor(query.getObject().getClass()); |
218 | return client.prepareIndex(persistentEntity.getIndexName(), persistentEntity.getIndexType(), query.getId()) |
219 | .setSource(objectMapper.writeValueAsString(query.getObject())); |
220 | } catch (IOException e) { |
221 | throw new ElasticsearchException("failed to index the document [id: " + query.getId() +"]",e); |
222 | } |
223 | } |
224 | |
225 | public void refresh(String indexName, boolean waitForOperation) { |
226 | client.admin().indices() |
227 | .refresh(refreshRequest(indexName).waitForOperations(waitForOperation)).actionGet(); |
228 | } |
229 | |
230 | public <T> void refresh(Class<T> clazz, boolean waitForOperation) { |
231 | ElasticsearchPersistentEntity persistentEntity = getPersistentEntityFor(clazz); |
232 | client.admin().indices() |
233 | .refresh(refreshRequest(persistentEntity.getIndexName()).waitForOperations(waitForOperation)).actionGet(); |
234 | } |
235 | |
236 | private ElasticsearchPersistentEntity getPersistentEntityFor(Class clazz){ |
237 | return elasticsearchConverter.getMappingContext().getPersistentEntity(clazz); |
238 | } |
239 | |
240 | private <T> Page<T> mapResults(SearchResponse response, final Class<T> elementType,final Pageable pageable){ |
241 | ResultsMapper<T> resultsMapper = new ResultsMapper<T>(){ |
242 | @Override |
243 | public Page<T> mapResults(SearchResponse response) { |
244 | long totalHits = response.getHits().totalHits(); |
245 | List<T> results = new ArrayList<T>(); |
246 | for (SearchHit hit : response.getHits()) { |
247 | if (hit != null) { |
248 | results.add(mapResult(hit.sourceAsString(), elementType)); |
249 | } |
250 | } |
251 | return new PageImpl<T>(results, pageable, totalHits); |
252 | } |
253 | }; |
254 | return resultsMapper.mapResults(response); |
255 | } |
256 | |
257 | private <T> T mapResult(String source, Class<T> clazz){ |
258 | if(isBlank(source)){ |
259 | return null; |
260 | } |
261 | try { |
262 | return objectMapper.readValue(source, clazz); |
263 | } catch (IOException e) { |
264 | throw new ElasticsearchException("failed to map source [ " + source + "] to class " + clazz.getSimpleName() , e); |
265 | } |
266 | } |
267 | } |