Fix reactive blocking calls.

Original Pull Request #1825 
Closes #1824
This commit is contained in:
Peter-Josef Meisch 2021-05-22 17:16:16 +02:00 committed by GitHub
parent e8f73b75ba
commit 7582617a26
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 282 additions and 99 deletions

View File

@ -101,7 +101,10 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
this.routingResolver = new DefaultRoutingResolver((SimpleElasticsearchMappingContext) mappingContext);
requestFactory = new RequestFactory(elasticsearchConverter);
VersionInfo.logVersions(getClusterVersion());
// initialize the VersionInfo class in the initialization phase
// noinspection ResultOfMethodCallIgnored
VersionInfo.versionProperties();
}
/**
@ -166,6 +169,16 @@ public abstract class AbstractElasticsearchTemplate implements ElasticsearchOper
public RefreshPolicy getRefreshPolicy() {
return refreshPolicy;
}
/**
* logs the versions of the different Elasticsearch components.
*
* @since 4.3
*/
public void logVersions() {
VersionInfo.logVersions(getClusterVersion());
}
// endregion
// region DocumentOperations

View File

@ -49,8 +49,8 @@ import org.springframework.data.elasticsearch.core.index.AliasData;
import org.springframework.data.elasticsearch.core.index.DeleteTemplateRequest;
import org.springframework.data.elasticsearch.core.index.ExistsTemplateRequest;
import org.springframework.data.elasticsearch.core.index.GetTemplateRequest;
import org.springframework.data.elasticsearch.core.index.MappingBuilder;
import org.springframework.data.elasticsearch.core.index.PutTemplateRequest;
import org.springframework.data.elasticsearch.core.index.ReactiveMappingBuilder;
import org.springframework.data.elasticsearch.core.index.Settings;
import org.springframework.data.elasticsearch.core.index.TemplateData;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
@ -190,8 +190,7 @@ class DefaultReactiveIndexOperations implements ReactiveIndexOperations {
}
}
String mapping = new MappingBuilder(converter).buildPropertyMapping(clazz);
return Mono.just(Document.parse(mapping));
return new ReactiveMappingBuilder(converter).buildReactivePropertyMapping(clazz).map(Document::parse);
}
@Override

View File

@ -142,7 +142,9 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
this.operations = new EntityOperations(this.mappingContext);
this.requestFactory = new RequestFactory(converter);
logVersions();
// initialize the VersionInfo class in the initialization phase
// noinspection ResultOfMethodCallIgnored
VersionInfo.versionProperties();
}
private ReactiveElasticsearchTemplate copy() {
@ -155,11 +157,14 @@ public class ReactiveElasticsearchTemplate implements ReactiveElasticsearchOpera
return copy;
}
private void logVersions() {
getClusterVersion() //
.doOnSuccess(VersionInfo::logVersions) //
.doOnError(e -> VersionInfo.logVersions(null)) //
.subscribe();
/**
* logs the versions of the different Elasticsearch components.
*
* @return a Mono signalling finished execution
* @since 4.3
*/
public Mono<Void> logVersions() {
return getClusterVersion().doOnNext(VersionInfo::logVersions).then();
}
@Override

View File

@ -63,7 +63,7 @@ public abstract class ReactiveResourceUtil {
String line;
while ((line = br.readLine()) != null) {
sb.append(line);
sb.append(line).append('\n');
}
sink.next(sb.toString());

View File

@ -97,7 +97,7 @@ public class MappingBuilder {
private static final String DYNAMIC_DATE_FORMATS = "dynamic_date_formats";
private static final String RUNTIME = "runtime";
private final ElasticsearchConverter elasticsearchConverter;
protected final ElasticsearchConverter elasticsearchConverter;
private boolean writeTypeHints = true;
@ -113,9 +113,16 @@ public class MappingBuilder {
*/
public String buildPropertyMapping(Class<?> clazz) throws MappingException {
ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext()
.getRequiredPersistentEntity(clazz);
return buildPropertyMapping(entity, getRuntimeFields(entity));
}
protected String buildPropertyMapping(ElasticsearchPersistentEntity<?> entity,
@Nullable org.springframework.data.elasticsearch.core.document.Document runtimeFields) {
try {
ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext()
.getRequiredPersistentEntity(clazz);
writeTypeHints = entity.writeTypeHints();
@ -124,7 +131,8 @@ public class MappingBuilder {
// Dynamic templates
addDynamicTemplatesMapping(builder, entity);
mapEntity(builder, entity, true, "", false, FieldType.Auto, null, entity.findAnnotation(DynamicMapping.class));
mapEntity(builder, entity, true, "", false, FieldType.Auto, null, entity.findAnnotation(DynamicMapping.class),
runtimeFields);
builder.endObject() // root object
.close();
@ -148,7 +156,8 @@ public class MappingBuilder {
private void mapEntity(XContentBuilder builder, @Nullable ElasticsearchPersistentEntity<?> entity,
boolean isRootObject, String nestedObjectFieldName, boolean nestedOrObjectField, FieldType fieldType,
@Nullable Field parentFieldAnnotation, @Nullable DynamicMapping dynamicMapping) throws IOException {
@Nullable Field parentFieldAnnotation, @Nullable DynamicMapping dynamicMapping,
@Nullable org.springframework.data.elasticsearch.core.document.Document runtimeFields) throws IOException {
if (entity != null && entity.isAnnotationPresent(Mapping.class)) {
Mapping mappingAnnotation = entity.getRequiredAnnotation(Mapping.class);
@ -170,8 +179,8 @@ public class MappingBuilder {
builder.field(DYNAMIC_DATE_FORMATS, mappingAnnotation.dynamicDateFormats());
}
if (StringUtils.hasText(mappingAnnotation.runtimeFieldsPath())) {
addRuntimeFields(builder, mappingAnnotation.runtimeFieldsPath());
if (runtimeFields != null) {
builder.field(RUNTIME, runtimeFields);
}
}
@ -227,13 +236,22 @@ public class MappingBuilder {
}
private void addRuntimeFields(XContentBuilder builder, String runtimeFieldsPath) throws IOException {
@Nullable
private org.springframework.data.elasticsearch.core.document.Document getRuntimeFields(
@Nullable ElasticsearchPersistentEntity<?> entity) {
ClassPathResource runtimeFields = new ClassPathResource(runtimeFieldsPath);
if (entity != null) {
Mapping mappingAnnotation = entity.findAnnotation(Mapping.class);
if (mappingAnnotation != null) {
String runtimeFieldsPath = mappingAnnotation.runtimeFieldsPath();
if (runtimeFields.exists()) {
builder.rawField(RUNTIME, runtimeFields.getInputStream(), XContentType.JSON);
if (hasText(runtimeFieldsPath)) {
String jsonString = ResourceUtil.readFileFromClasspath(runtimeFieldsPath);
return org.springframework.data.elasticsearch.core.document.Document.parse(jsonString);
}
}
}
return null;
}
private void buildPropertyMapping(XContentBuilder builder, boolean isRootObject,
@ -291,7 +309,7 @@ public class MappingBuilder {
: null;
mapEntity(builder, persistentEntity, false, property.getFieldName(), true, fieldAnnotation.type(),
fieldAnnotation, dynamicMapping);
fieldAnnotation, dynamicMapping, null);
return;
}
}

View File

@ -0,0 +1,78 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.core.index;
import static org.springframework.util.StringUtils.*;
import reactor.core.publisher.Mono;
import org.springframework.data.elasticsearch.annotations.Mapping;
import org.springframework.data.elasticsearch.core.ReactiveResourceUtil;
import org.springframework.data.elasticsearch.core.convert.ElasticsearchConverter;
import org.springframework.data.elasticsearch.core.document.Document;
import org.springframework.data.elasticsearch.core.mapping.ElasticsearchPersistentEntity;
import org.springframework.data.mapping.MappingException;
import org.springframework.lang.Nullable;
/**
* Subclass of {@link MappingBuilder} with specialized methods TO inhibit blocking CALLS
*
* @author Peter-Josef Meisch
* @since 4.3
*/
public class ReactiveMappingBuilder extends MappingBuilder {
public ReactiveMappingBuilder(ElasticsearchConverter elasticsearchConverter) {
super(elasticsearchConverter);
}
@Override
public String buildPropertyMapping(Class<?> clazz) throws MappingException {
throw new UnsupportedOperationException(
"Use ReactiveMappingBuilder.buildReactivePropertyMapping() instead of buildPropertyMapping()");
}
public Mono<String> buildReactivePropertyMapping(Class<?> clazz) throws MappingException {
ElasticsearchPersistentEntity<?> entity = elasticsearchConverter.getMappingContext()
.getRequiredPersistentEntity(clazz);
return getRuntimeFields(entity) //
.switchIfEmpty(Mono.just(Document.create())) //
.map(document -> {
if (document.isEmpty()) {
return buildPropertyMapping(entity, null);
} else {
return buildPropertyMapping(entity, document);
}
});
}
private Mono<Document> getRuntimeFields(@Nullable ElasticsearchPersistentEntity<?> entity) {
if (entity != null) {
Mapping mappingAnnotation = entity.findAnnotation(Mapping.class);
if (mappingAnnotation != null) {
String runtimeFieldsPath = mappingAnnotation.runtimeFieldsPath();
if (hasText(runtimeFieldsPath)) {
return ReactiveResourceUtil.readFileFromClasspath(runtimeFieldsPath).map(Document::parse);
}
}
}
return Mono.empty();
}
}

View File

@ -15,9 +15,9 @@
*/
package org.springframework.data.elasticsearch.support;
import java.io.IOException;
import java.io.InputStream;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import org.elasticsearch.Version;
import org.slf4j.Logger;
@ -35,70 +35,76 @@ import org.springframework.lang.Nullable;
public final class VersionInfo {
private static final Logger LOG = LoggerFactory.getLogger(VersionInfo.class);
private static final AtomicBoolean initialized = new AtomicBoolean(false);
private static final String VERSION_PROPERTIES = "versions.properties";
protected static final String VERSION_PROPERTIES = "versions.properties";
public static final String VERSION_SPRING_DATA_ELASTICSEARCH = "version.spring-data-elasticsearch";
public static final String VERSION_ELASTICSEARCH_CLIENT = "version.elasticsearch-client";
/**
* logs the relevant version info the first time it is called. Does nothing after the first call
*
* @param clusterVersion the version of the cluster
*/
public static void logVersions(@Nullable String clusterVersion) {
if (!initialized.getAndSet(true)) {
try {
InputStream resource = VersionInfo.class.getClassLoader().getResourceAsStream(VERSION_PROPERTIES);
if (resource != null) {
Properties properties = new Properties();
properties.load(resource);
private static Properties versionProperties;
String versionSpringDataElasticsearch = properties.getProperty(VERSION_SPRING_DATA_ELASTICSEARCH);
Version versionESBuilt = Version.fromString(properties.getProperty(VERSION_ELASTICSEARCH_CLIENT));
Version versionESUsed = Version.CURRENT;
Version versionESCluster = clusterVersion != null ? Version.fromString(clusterVersion) : null;
LOG.info("Version Spring Data Elasticsearch: {}", versionSpringDataElasticsearch.toString());
LOG.info("Version Elasticsearch Client in build: {}", versionESBuilt.toString());
LOG.info("Version Elasticsearch Client used: {}", versionESUsed.toString());
if (differInMajorOrMinor(versionESBuilt, versionESUsed)) {
LOG.warn("Version mismatch in between Elasticsearch Clients build/use: {} - {}", versionESBuilt,
versionESUsed);
}
if (versionESCluster != null) {
LOG.info("Version Elasticsearch cluster: {}", versionESCluster.toString());
if (differInMajorOrMinor(versionESUsed, versionESCluster)) {
LOG.warn("Version mismatch in between Elasticsearch Client and Cluster: {} - {}", versionESUsed,
versionESCluster);
}
}
} else {
LOG.warn("cannot load {}", VERSION_PROPERTIES);
}
} catch (Exception e) {
LOG.warn("Could not log version info: {} - {}", e.getClass().getSimpleName(), e.getMessage());
}
public static Properties versionProperties() {
return versionProperties;
}
static {
try {
versionProperties = loadVersionProperties();
} catch (IOException e) {
LOG.error("Could not load {}", VERSION_PROPERTIES, e);
versionProperties = new Properties();
versionProperties.put(VERSION_SPRING_DATA_ELASTICSEARCH, "0.0.0");
versionProperties.put(VERSION_ELASTICSEARCH_CLIENT, "0.0.0");
}
}
public static Properties versionProperties() throws Exception {
/**
* logs the relevant version info.
*
* @param clusterVersion the version of the cluster
*/
public static void logVersions(@Nullable String clusterVersion) {
try {
InputStream resource = VersionInfo.class.getClassLoader().getResourceAsStream(VERSION_PROPERTIES);
if (resource != null) {
Properties properties = new Properties();
properties.load(resource);
return properties;
} else {
throw new IllegalStateException("Resource not found");
String versionSpringDataElasticsearch = versionProperties.getProperty(VERSION_SPRING_DATA_ELASTICSEARCH);
Version versionESBuilt = Version.fromString(versionProperties.getProperty(VERSION_ELASTICSEARCH_CLIENT));
Version versionESUsed = Version.CURRENT;
Version versionESCluster = clusterVersion != null ? Version.fromString(clusterVersion) : null;
LOG.info("Version Spring Data Elasticsearch: {}", versionSpringDataElasticsearch.toString());
LOG.info("Version Elasticsearch Client in build: {}", versionESBuilt.toString());
LOG.info("Version Elasticsearch Client used: {}", versionESUsed.toString());
if (differInMajorOrMinor(versionESBuilt, versionESUsed)) {
LOG.warn("Version mismatch in between Elasticsearch Clients build/use: {} - {}", versionESBuilt, versionESUsed);
}
if (versionESCluster != null) {
LOG.info("Version Elasticsearch cluster: {}", versionESCluster.toString());
if (differInMajorOrMinor(versionESUsed, versionESCluster)) {
LOG.warn("Version mismatch in between Elasticsearch Client and Cluster: {} - {}", versionESUsed,
versionESCluster);
}
}
} catch (Exception e) {
LOG.error("Could not load {}", VERSION_PROPERTIES, e);
throw e;
LOG.warn("Could not log version info: {} - {}", e.getClass().getSimpleName(), e.getMessage());
}
}
/**
* gets the version properties from the classpath resource.
*
* @return version properties
* @throws IOException when an error occurs
*/
private static Properties loadVersionProperties() throws IOException {
InputStream resource = VersionInfo.class.getClassLoader().getResourceAsStream(VERSION_PROPERTIES);
if (resource != null) {
Properties properties = new Properties();
properties.load(resource);
return properties;
} else {
throw new IllegalStateException("Resource not found");
}
}

View File

@ -16,23 +16,26 @@
package org.springframework.data.elasticsearch.blockhound;
import reactor.blockhound.BlockHound;
import reactor.blockhound.BlockingOperationError;
import reactor.blockhound.integration.BlockHoundIntegration;
import org.elasticsearch.Build;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.springframework.data.elasticsearch.support.VersionInfo;
/**
* @author Peter-Josef Meisch
*/
public class BlockHoundIntegrationCustomizer implements BlockHoundIntegration {
@Override
public void applyTo(BlockHound.Builder builder) {
// Elasticsearch classes reading from the classpath on initialization, needed for parsing Elasticsearch responses
builder.allowBlockingCallsInside(XContentBuilder.class.getName(), "<clinit>")
.allowBlockingCallsInside(Build.class.getName(), "<clinit>");
// Spring Data Elasticsearch classes reading from the classpath
builder.allowBlockingCallsInside(VersionInfo.class.getName(), "logVersions");
builder.blockingMethodCallback(it -> {
throw new BlockingOperationError(it);
});
}
}

View File

@ -74,10 +74,6 @@ public class ReactiveElasticsearchTemplateUnitTests {
@BeforeEach
public void setUp() {
when(client.info()).thenReturn(Mono.just(new MainResponse("mockNodename", org.elasticsearch.Version.CURRENT,
new ClusterName("mockCluster"), "mockUuid", null)));
template = new ReactiveElasticsearchTemplate(client);
}

View File

@ -15,43 +15,67 @@
*/
package org.springframework.data.elasticsearch.core.index;
import static org.skyscreamer.jsonassert.JSONAssert.*;
import static org.springframework.data.elasticsearch.annotations.FieldType.*;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import java.time.Instant;
import org.json.JSONException;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.annotation.Id;
import org.springframework.data.elasticsearch.annotations.DateFormat;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.Mapping;
import org.springframework.data.elasticsearch.core.ReactiveElasticsearchOperations;
import org.springframework.data.elasticsearch.core.ReactiveIndexOperations;
import org.springframework.data.elasticsearch.junit.jupiter.ReactiveElasticsearchRestTemplateConfiguration;
import org.springframework.data.elasticsearch.junit.jupiter.SpringIntegrationTest;
import org.springframework.data.elasticsearch.core.MappingContextBaseTests;
import org.springframework.lang.Nullable;
import org.springframework.test.context.ContextConfiguration;
/**
* @author Peter-Josef Meisch
*/
@SpringIntegrationTest
@ContextConfiguration(classes = { ReactiveElasticsearchRestTemplateConfiguration.class })
public class ReactiveMappingBuilderIntegrationTests {
public class ReactiveMappingBuilderTests extends MappingContextBaseTests {
@Autowired private ReactiveElasticsearchOperations operations;
ReactiveMappingBuilder getReactiveMappingBuilder() {
return new ReactiveMappingBuilder(elasticsearchConverter.get());
}
@Test // #1822
@Test // #1822, #1824
@DisplayName("should write runtime fields")
void shouldWriteRuntimeFields() {
void shouldWriteRuntimeFields() throws JSONException {
ReactiveIndexOperations indexOps = operations.indexOps(RuntimeFieldEntity.class);
ReactiveMappingBuilder mappingBuilder = getReactiveMappingBuilder();
indexOps.create().block();
indexOps.putMapping().block();
indexOps.delete().block();
String expected = "{\n" + //
" \"runtime\": {\n" + //
" \"day_of_week\": {\n" + //
" \"type\": \"keyword\",\n" + //
" \"script\": {\n" + //
" \"source\": \"emit(doc['@timestamp'].value.dayOfWeekEnum.getDisplayName(TextStyle.FULL, Locale.ROOT))\"\n"
+ //
" }\n" + //
" }\n" + //
" },\n" + //
" \"properties\": {\n" + //
" \"_class\": {\n" + //
" \"type\": \"keyword\",\n" + //
" \"index\": false,\n" + //
" \"doc_values\": false\n" + //
" },\n" + //
" \"@timestamp\": {\n" + //
" \"type\": \"date\",\n" + //
" \"format\": \"epoch_millis\"\n" + //
" }\n" + //
" }\n" + //
"}\n"; //
String mapping = Mono.defer(() -> mappingBuilder.buildReactivePropertyMapping(RuntimeFieldEntity.class))
.subscribeOn(Schedulers.parallel()).block();
assertEquals(expected, mapping, true);
}
// region entities

View File

@ -0,0 +1,41 @@
/*
* Copyright 2021 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.springframework.data.elasticsearch.support;
import static org.assertj.core.api.Assertions.*;
import java.io.IOException;
import java.util.Properties;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
/**
* @author Peter-Josef Meisch
*/
class VersionInfoTest {
@Test // #1824
@DisplayName("should read version properties")
void shouldReadVersionProperties() throws IOException {
Properties properties = VersionInfo.versionProperties();
assertThat(properties).isNotNull();
assertThat(properties.getProperty("version.spring-data-elasticsearch")).isNotNull();
assertThat(properties.getProperty("version.elasticsearch-client")).isNotNull();
}
}