Simplify and Speed up some Compression Usage (#60953) (#61008)

Use thread-local buffers and deflater and inflater instances to speed up
compressing and decompressing from in-memory bytes.
Not manually invoking `end()` on these should be safe since their off-heap memory
will eventually be reclaimed by the finalizer thread which should not be an issue for thread-locals
that are not instantiated at a high frequency.
This significantly reduces the amount of byte copying and object creation relative to the previous approach
which had to create a fresh temporary buffer (that was then resized multiple times during operations), copied
bytes out of that buffer to a freshly allocated `byte[]`, used 4k stream buffers needlessly when working with
bytes that are already in arrays (`writeTo` handles efficient writing to the compression logic now) etc.

Relates #57284 which should be helped by this change to some degree.
Also, I expect this change to speed up mapping/template updates a little as those make heavy use of these
code paths.
This commit is contained in:
Armin Braun 2020-08-12 11:06:23 +02:00 committed by GitHub
parent 35423a75af
commit 32423a486d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
19 changed files with 105 additions and 81 deletions

View File

@ -112,7 +112,7 @@ public class GetIndexTemplatesResponseTests extends ESTestCase {
assertThat(result.version(), equalTo(esIMD.version()));
assertThat(esIMD.mappings().size(), equalTo(1));
BytesArray mappingSource = new BytesArray(esIMD.mappings().valuesIt().next().uncompressed());
BytesReference mappingSource = esIMD.mappings().valuesIt().next().uncompressed();
Map<String, Object> expectedMapping =
XContentHelper.convertToMap(mappingSource, true, xContentBuilder.contentType()).v2();
assertThat(result.mappings().sourceAsMap(), equalTo(expectedMapping.get("_doc")));

View File

@ -25,7 +25,6 @@ import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.io.stream.StreamInput;
@ -343,7 +342,7 @@ public class AliasMetadata extends AbstractDiffable<AliasMetadata> implements To
if (binary) {
builder.field("filter", aliasMetadata.filter.compressed());
} else {
builder.field("filter", XContentHelper.convertToMap(new BytesArray(aliasMetadata.filter().uncompressed()), true).v2());
builder.field("filter", XContentHelper.convertToMap(aliasMetadata.filter().uncompressed(), true).v2());
}
}
if (aliasMetadata.indexRouting() != null) {

View File

@ -22,6 +22,7 @@ package org.elasticsearch.cluster.metadata;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.XContentFactory;
@ -33,6 +34,7 @@ import org.elasticsearch.index.query.Rewriteable;
import org.elasticsearch.indices.InvalidAliasNameException;
import java.io.IOException;
import java.io.InputStream;
import java.util.function.Function;
import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
@ -125,11 +127,13 @@ public class AliasValidator {
* provided {@link org.elasticsearch.index.query.QueryShardContext}
* @throws IllegalArgumentException if the filter is not valid
*/
public void validateAliasFilter(String alias, byte[] filter, QueryShardContext queryShardContext,
NamedXContentRegistry xContentRegistry) {
public void validateAliasFilter(String alias, BytesReference filter, QueryShardContext queryShardContext,
NamedXContentRegistry xContentRegistry) {
assert queryShardContext != null;
try (XContentParser parser = XContentFactory.xContent(filter)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, filter)) {
try (InputStream inputStream = filter.streamInput();
XContentParser parser = XContentFactory.xContentType(inputStream).xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, filter.streamInput())) {
validateAliasFilter(parser, queryShardContext);
} catch (Exception e) {
throw new IllegalArgumentException("failed to parse filter for alias [" + alias + "]", e);

View File

@ -35,7 +35,6 @@ import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.node.DiscoveryNodeFilters;
import org.elasticsearch.cluster.routing.allocation.IndexMetadataUpdater;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenIntMap;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.MapBuilder;
@ -1408,7 +1407,7 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
if (binary) {
builder.value(cursor.value.source().compressed());
} else {
builder.map(XContentHelper.convertToMap(new BytesArray(cursor.value.source().uncompressed()), true).v2());
builder.map(XContentHelper.convertToMap(cursor.value.source().uncompressed(), true).v2());
}
}
builder.endArray();
@ -1416,7 +1415,7 @@ public class IndexMetadata implements Diffable<IndexMetadata>, ToXContentFragmen
builder.startObject(KEY_MAPPINGS);
for (ObjectObjectCursor<String, MappingMetadata> cursor : indexMetadata.getMappings()) {
Map<String, Object> mapping = XContentHelper
.convertToMap(new BytesArray(cursor.value.source().uncompressed()), false).v2();
.convertToMap(cursor.value.source().uncompressed(), false).v2();
if (mapping.size() == 1 && mapping.containsKey(cursor.key)) {
// the type name is the root value, reduce it
mapping = (Map<String, Object>) mapping.get(cursor.key);

View File

@ -27,7 +27,6 @@ import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.cluster.Diff;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.compress.CompressedXContent;
@ -408,7 +407,7 @@ public class IndexTemplateMetadata extends AbstractDiffable<IndexTemplateMetadat
if (context == Metadata.XContentContext.API) {
builder.startObject("mappings");
for (ObjectObjectCursor<String, CompressedXContent> cursor1 : indexTemplateMetadata.mappings()) {
Map<String, Object> mapping = XContentHelper.convertToMap(new BytesArray(cursor1.value.uncompressed()), false).v2();
Map<String, Object> mapping = XContentHelper.convertToMap(cursor1.value.uncompressed(), false).v2();
if (mapping.size() == 1 && mapping.containsKey(cursor1.key)) {
// the type name is the root value, reduce it
mapping = (Map<String, Object>) mapping.get(cursor1.key);
@ -425,8 +424,7 @@ public class IndexTemplateMetadata extends AbstractDiffable<IndexTemplateMetadat
for (ObjectObjectCursor<String, CompressedXContent> cursor : indexTemplateMetadata.mappings()) {
if (!cursor.key.equals(MapperService.DEFAULT_MAPPING)) {
assert documentMapping == null;
byte[] mappingSource = cursor.value.uncompressed();
Map<String, Object> mapping = XContentHelper.convertToMap(new BytesArray(mappingSource), true).v2();
Map<String, Object> mapping = XContentHelper.convertToMap(cursor.value.uncompressed(), true).v2();
documentMapping = reduceMapping(cursor.key, mapping);
}
}
@ -439,8 +437,7 @@ public class IndexTemplateMetadata extends AbstractDiffable<IndexTemplateMetadat
} else {
builder.startObject("mappings");
for (ObjectObjectCursor<String, CompressedXContent> cursor : indexTemplateMetadata.mappings()) {
byte[] mappingSource = cursor.value.uncompressed();
Map<String, Object> mapping = XContentHelper.convertToMap(new BytesArray(mappingSource), true).v2();
Map<String, Object> mapping = XContentHelper.convertToMap(cursor.value.uncompressed(), true).v2();
mapping = reduceMapping(cursor.key, mapping);
builder.field(cursor.key);
builder.map(mapping);
@ -450,8 +447,7 @@ public class IndexTemplateMetadata extends AbstractDiffable<IndexTemplateMetadat
} else {
builder.startArray("mappings");
for (ObjectObjectCursor<String, CompressedXContent> cursor : indexTemplateMetadata.mappings()) {
byte[] data = cursor.value.uncompressed();
builder.map(XContentHelper.convertToMap(new BytesArray(data), true).v2());
builder.map(XContentHelper.convertToMap(cursor.value.uncompressed(), true).v2());
}
builder.endArray();
}

View File

@ -23,7 +23,6 @@ import org.elasticsearch.cluster.AbstractDiffable;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.ParseField;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.io.stream.StreamInput;
import org.elasticsearch.common.io.stream.StreamOutput;
@ -169,7 +168,7 @@ public class Template extends AbstractDiffable<Template> implements ToXContentOb
}
if (this.mappings != null) {
Map<String, Object> uncompressedMapping =
XContentHelper.convertToMap(new BytesArray(this.mappings.uncompressed()), true, XContentType.JSON).v2();
XContentHelper.convertToMap(this.mappings.uncompressed(), true, XContentType.JSON).v2();
if (uncompressedMapping.size() > 0) {
builder.field(MAPPINGS.getPreferredName());
builder.map(reduceMapping(uncompressedMapping));

View File

@ -19,7 +19,6 @@
package org.elasticsearch.common.compress;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
@ -94,13 +93,9 @@ public final class CompressedXContent {
if (compressor != null) {
// already compressed...
this.bytes = BytesReference.toBytes(data);
this.crc32 = crc32(new BytesArray(uncompressed()));
this.crc32 = crc32(uncompressed());
} else {
BytesStreamOutput out = new BytesStreamOutput();
try (OutputStream compressedOutput = CompressorFactory.COMPRESSOR.streamOutput(out)) {
data.writeTo(compressedOutput);
}
this.bytes = BytesReference.toBytes(out.bytes());
this.bytes = BytesReference.toBytes(CompressorFactory.COMPRESSOR.compress(data));
this.crc32 = crc32(data);
}
assertConsistent();
@ -108,7 +103,7 @@ public final class CompressedXContent {
private void assertConsistent() {
assert CompressorFactory.compressor(new BytesArray(bytes)) != null;
assert this.crc32 == crc32(new BytesArray(uncompressed()));
assert this.crc32 == crc32(uncompressed());
}
public CompressedXContent(byte[] data) throws IOException {
@ -130,16 +125,16 @@ public final class CompressedXContent {
}
/** Return the uncompressed bytes. */
public byte[] uncompressed() {
public BytesReference uncompressed() {
try {
return BytesReference.toBytes(CompressorFactory.uncompress(new BytesArray(bytes)));
return CompressorFactory.uncompress(new BytesArray(bytes));
} catch (IOException e) {
throw new IllegalStateException("Cannot decompress compressed string", e);
}
}
public String string() {
return new BytesRef(uncompressed()).utf8ToString();
return uncompressed().utf8ToString();
}
public static CompressedXContent readCompressedString(StreamInput in) throws IOException {
@ -167,7 +162,7 @@ public final class CompressedXContent {
return false;
}
return Arrays.equals(uncompressed(), that.uncompressed());
return uncompressed().equals(that.uncompressed());
}
@Override

View File

@ -39,4 +39,20 @@ public interface Compressor {
* output. Closing the returned {@link StreamOutput} will close the provided stream output.
*/
StreamOutput streamOutput(OutputStream out) throws IOException;
/**
* Decompress bytes into a newly allocated buffer.
*
* @param bytesReference bytes to decompress
* @return decompressed bytes
*/
BytesReference uncompress(BytesReference bytesReference) throws IOException;
/**
* Compress bytes into a newly allocated buffer.
*
* @param bytesReference bytes to compress
* @return compressed bytes
*/
BytesReference compress(BytesReference bytesReference) throws IOException;
}

View File

@ -21,7 +21,6 @@ package org.elasticsearch.common.compress;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.common.xcontent.XContentType;
@ -71,14 +70,7 @@ public class CompressorFactory {
*/
public static BytesReference uncompressIfNeeded(BytesReference bytes) throws IOException {
Compressor compressor = compressor(Objects.requireNonNull(bytes, "the BytesReference must not be null"));
BytesReference uncompressed;
if (compressor != null) {
uncompressed = uncompress(bytes, compressor);
} else {
uncompressed = bytes;
}
return uncompressed;
return compressor == null ? bytes : compressor.uncompress(bytes);
}
/** Decompress the provided {@link BytesReference}. */
@ -87,10 +79,6 @@ public class CompressorFactory {
if (compressor == null) {
throw new NotCompressedException();
}
return uncompress(bytes, compressor);
}
private static BytesReference uncompress(BytesReference bytes, Compressor compressor) throws IOException {
return Streams.readFully(compressor.streamInput(bytes.streamInput()));
return compressor.uncompress(bytes);
}
}

View File

@ -20,6 +20,7 @@
package org.elasticsearch.common.compress;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.InputStreamStreamInput;
import org.elasticsearch.common.io.stream.OutputStreamStreamOutput;
import org.elasticsearch.common.io.stream.StreamInput;
@ -36,6 +37,7 @@ import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterInputStream;
import java.util.zip.InflaterOutputStream;
/**
* {@link Compressor} implementation based on the DEFLATE compression algorithm.
@ -129,4 +131,41 @@ public class DeflateCompressor implements Compressor {
}
};
}
// Reusable Inflater reference. Note: This is not used for the decompressing stream wrapper because we don't have strong guarantees
// about the scope in which the stream wrapper is used.
private static final ThreadLocal<Inflater> inflaterRef = ThreadLocal.withInitial(() -> new Inflater(true));
private static final ThreadLocal<BytesStreamOutput> baos = ThreadLocal.withInitial(BytesStreamOutput::new);
@Override
public BytesReference uncompress(BytesReference bytesReference) throws IOException {
final BytesStreamOutput buffer = baos.get();
final Inflater inflater = inflaterRef.get();
inflater.reset();
try (InflaterOutputStream ios = new InflaterOutputStream(buffer, inflater)) {
bytesReference.slice(HEADER.length, bytesReference.length() - HEADER.length).writeTo(ios);
}
final BytesReference res = buffer.copyBytes();
buffer.reset();
return res;
}
// Reusable Deflater reference. Note: This is not used for the compressing stream wrapper because we don't have strong guarantees
// about the scope in which the stream wrapper is used.
private static final ThreadLocal<Deflater> deflaterRef = ThreadLocal.withInitial(() -> new Deflater(LEVEL, true));
@Override
public BytesReference compress(BytesReference bytesReference) throws IOException {
final BytesStreamOutput buffer = baos.get();
final Deflater deflater = deflaterRef.get();
deflater.reset();
buffer.write(HEADER);
try (DeflaterOutputStream dos = new DeflaterOutputStream(buffer, deflater, true)) {
bytesReference.writeTo(dos);
}
final BytesReference res = buffer.copyBytes();
buffer.reset();
return res;
}
}

View File

@ -137,6 +137,7 @@ import org.elasticsearch.threadpool.ThreadPool;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.UncheckedIOException;
import java.nio.file.Files;
import java.util.ArrayList;
@ -1493,9 +1494,10 @@ public class IndicesService extends AbstractLifecycleComponent
public AliasFilter buildAliasFilter(ClusterState state, String index, Set<String> resolvedExpressions) {
/* Being static, parseAliasFilter doesn't have access to whatever guts it needs to parse a query. Instead of passing in a bunch
* of dependencies we pass in a function that can perform the parsing. */
CheckedFunction<byte[], QueryBuilder, IOException> filterParser = bytes -> {
try (XContentParser parser = XContentFactory.xContent(bytes)
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, bytes)) {
CheckedFunction<BytesReference, QueryBuilder, IOException> filterParser = bytes -> {
try (InputStream inputStream = bytes.streamInput();
XContentParser parser = XContentFactory.xContentType(inputStream).xContent()
.createParser(xContentRegistry, LoggingDeprecationHandler.INSTANCE, inputStream)) {
return parseInnerQueryBuilder(parser);
}
};

View File

@ -67,8 +67,6 @@ import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.compress.CompressorFactory;
import org.elasticsearch.common.compress.NotXContentException;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.io.stream.BytesStreamOutput;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.lucene.store.InputStreamIndexInput;
@ -1318,12 +1316,8 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
private void cacheRepositoryData(BytesReference updated, long generation) {
if (cacheRepositoryData && bestEffortConsistency == false) {
final BytesReference serialized;
BytesStreamOutput out = new BytesStreamOutput();
try {
try (StreamOutput tmp = CompressorFactory.COMPRESSOR.streamOutput(out)) {
updated.writeTo(tmp);
}
serialized = out.bytes();
serialized = CompressorFactory.COMPRESSOR.compress(updated);
final int len = serialized.length();
if (len > ByteSizeUnit.KB.toBytes(500)) {
logger.debug("Not caching repository data of size [{}] for repository [{}] because it is larger than 500KB in" +

View File

@ -441,7 +441,7 @@ public class ShardSearchRequest extends TransportRequest implements IndicesReque
* The list of filtering aliases should be obtained by calling Metadata.filteringAliases.
* Returns {@code null} if no filtering is required.</p>
*/
public static QueryBuilder parseAliasFilter(CheckedFunction<byte[], QueryBuilder, IOException> filterParser,
public static QueryBuilder parseAliasFilter(CheckedFunction<BytesReference, QueryBuilder, IOException> filterParser,
IndexMetadata metadata, String... aliasNames) {
if (aliasNames == null || aliasNames.length == 0) {
return null;

View File

@ -38,7 +38,7 @@ public class DeflateCompressedXContentTests extends ESTestCase {
private void assertEquals(CompressedXContent s1, CompressedXContent s2) {
Assert.assertEquals(s1, s2);
assertArrayEquals(s1.uncompressed(), s2.uncompressed());
assertEquals(s1.uncompressed(), s2.uncompressed());
assertEquals(s1.hashCode(), s2.hashCode());
}

View File

@ -24,7 +24,6 @@ import org.elasticsearch.action.OriginalIndices;
import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.cluster.metadata.AliasMetadata;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.CheckedFunction;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.compress.CompressedXContent;
@ -44,6 +43,7 @@ import org.elasticsearch.test.VersionUtils;
import org.elasticsearch.search.SearchSortValuesAndFormatsTests;
import java.io.IOException;
import java.io.InputStream;
import static org.elasticsearch.index.query.AbstractQueryBuilder.parseInnerQueryBuilder;
import static org.elasticsearch.index.query.QueryBuilders.termQuery;
@ -188,12 +188,12 @@ public class ShardSearchRequestTests extends AbstractSearchTestCase {
}
public QueryBuilder aliasFilter(IndexMetadata indexMetadata, String... aliasNames) {
CheckedFunction<byte[], QueryBuilder, IOException> filterParser = bytes -> {
try (XContentParser parser = XContentFactory.xContent(bytes)
.createParser(xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, bytes)) {
return ShardSearchRequest.parseAliasFilter(bytes -> {
try (InputStream inputStream = bytes.streamInput();
XContentParser parser = XContentFactory.xContentType(inputStream).xContent()
.createParser(xContentRegistry(), DeprecationHandler.THROW_UNSUPPORTED_OPERATION, inputStream)) {
return parseInnerQueryBuilder(parser);
}
};
return ShardSearchRequest.parseAliasFilter(filterParser, indexMetadata, aliasNames);
}, indexMetadata, aliasNames);
}
}

View File

@ -12,7 +12,6 @@ import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.compress.CompressedXContent;
@ -167,9 +166,7 @@ public class TemplateUtils {
for (Object typeMapping : mappings.values().toArray()) {
CompressedXContent typeMappingXContent = (CompressedXContent) typeMapping;
try {
Map<String, Object> typeMappingMap = convertToMap(
new BytesArray(typeMappingXContent.uncompressed()), false,
XContentType.JSON).v2();
Map<String, Object> typeMappingMap = convertToMap(typeMappingXContent.uncompressed(), false, XContentType.JSON).v2();
// should always contain one entry with key = typename
assert (typeMappingMap.size() == 1);
String key = typeMappingMap.keySet().iterator().next();

View File

@ -13,7 +13,6 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.MappingMetadata;
import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.tasks.Task;
@ -99,8 +98,7 @@ public class TransportGetRollupCapsAction extends HandledTransportAction<GetRoll
return Optional.empty();
}
RollupIndexCaps caps = RollupIndexCaps.parseMetadataXContent(
new BytesArray(rollupMapping.source().uncompressed()), indexName);
RollupIndexCaps caps = RollupIndexCaps.parseMetadataXContent(rollupMapping.source().uncompressed(), indexName);
if (caps.hasCaps()) {
return Optional.of(caps);

View File

@ -18,7 +18,6 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexTemplateMetadata;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentBuilder;
@ -377,7 +376,7 @@ public final class TransformInternalIndex {
// Installing the template involves communication with the master node, so it's more expensive but much rarer
try {
IndexTemplateMetadata indexTemplateMetadata = getIndexTemplateMetadata();
BytesReference jsonMappings = new BytesArray(indexTemplateMetadata.mappings().get(SINGLE_MAPPING_NAME).uncompressed());
BytesReference jsonMappings = indexTemplateMetadata.mappings().get(SINGLE_MAPPING_NAME).uncompressed();
PutIndexTemplateRequest request = new PutIndexTemplateRequest(TransformInternalIndexConstants.LATEST_INDEX_VERSIONED_NAME)
.patterns(indexTemplateMetadata.patterns())
.version(indexTemplateMetadata.version())
@ -413,7 +412,7 @@ public final class TransformInternalIndex {
// Installing the template involves communication with the master node, so it's more expensive but much rarer
try {
IndexTemplateMetadata indexTemplateMetadata = getAuditIndexTemplateMetadata(clusterService.state().nodes().getMinNodeVersion());
BytesReference jsonMappings = new BytesArray(indexTemplateMetadata.mappings().get(SINGLE_MAPPING_NAME).uncompressed());
BytesReference jsonMappings = indexTemplateMetadata.mappings().get(SINGLE_MAPPING_NAME).uncompressed();
PutIndexTemplateRequest request = new PutIndexTemplateRequest(TransformInternalIndexConstants.AUDIT_INDEX).patterns(
indexTemplateMetadata.patterns()
)

View File

@ -7,7 +7,6 @@ package org.elasticsearch.xpack.watcher.test.integration;
import org.elasticsearch.action.admin.indices.mapping.get.GetMappingsResponse;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentType;
import org.elasticsearch.protocol.xpack.watcher.PutWatchResponse;
@ -101,8 +100,8 @@ public class HistoryIntegrationTests extends AbstractWatcherIntegrationTestCase
// as fields with dots are allowed in 5.0 again, the mapping must be checked in addition
GetMappingsResponse response = client().admin().indices().prepareGetMappings(".watcher-history*")
.addTypes(SINGLE_MAPPING_NAME).get();
byte[] bytes = response.getMappings().values().iterator().next().value.get(SINGLE_MAPPING_NAME).source().uncompressed();
XContentSource source = new XContentSource(new BytesArray(bytes), XContentType.JSON);
XContentSource source = new XContentSource(response.getMappings().values().iterator().next().value.get(SINGLE_MAPPING_NAME)
.source().uncompressed(), XContentType.JSON);
// lets make sure the body fields are disabled
if (useChained) {
String chainedPath = SINGLE_MAPPING_NAME +
@ -142,8 +141,8 @@ public class HistoryIntegrationTests extends AbstractWatcherIntegrationTestCase
// as fields with dots are allowed in 5.0 again, the mapping must be checked in addition
GetMappingsResponse response = client().admin().indices().prepareGetMappings(".watcher-history*")
.addTypes(SINGLE_MAPPING_NAME).get();
byte[] bytes = response.getMappings().values().iterator().next().value.get(SINGLE_MAPPING_NAME).source().uncompressed();
XContentSource source = new XContentSource(new BytesArray(bytes), XContentType.JSON);
XContentSource source = new XContentSource(response.getMappings().values().iterator().next().value.get(SINGLE_MAPPING_NAME)
.source().uncompressed(), XContentType.JSON);
// lets make sure the body fields are disabled
if (useChained) {
@ -200,8 +199,8 @@ public class HistoryIntegrationTests extends AbstractWatcherIntegrationTestCase
// also ensure that the status field is disabled in the watch history
GetMappingsResponse response = client().admin().indices().prepareGetMappings(".watcher-history*")
.addTypes(SINGLE_MAPPING_NAME).get();
byte[] bytes = response.getMappings().values().iterator().next().value.get(SINGLE_MAPPING_NAME).source().uncompressed();
XContentSource mappingSource = new XContentSource(new BytesArray(bytes), XContentType.JSON);
XContentSource mappingSource = new XContentSource(response.getMappings().values().iterator().next().value.get(SINGLE_MAPPING_NAME)
.source().uncompressed(), XContentType.JSON);
assertThat(mappingSource.getValue(SINGLE_MAPPING_NAME + ".properties.status.enabled"), is(false));
assertThat(mappingSource.getValue(SINGLE_MAPPING_NAME + ".properties.status.properties.status"), is(nullValue()));
assertThat(mappingSource.getValue(SINGLE_MAPPING_NAME + ".properties.status.properties.status.properties.active"), is(nullValue()));