Reduce allocations when persisting cluster state (#61159)

Today we allocate a new `byte[]` for each document written to the
cluster state. Some of these documents may be quite large. We need a
buffer that's at least as large as the largest document, but there's no
need to use a fresh buffer for each document.

With this commit we re-use the same `byte[]` much more, only allocating
it afresh if we need a larger one, and using the buffer needed for one
round of persistence as a hint for the size needed for the next one.
This commit is contained in:
David Turner 2020-08-17 13:44:58 +01:00
parent 524247bbc0
commit b21cb7f466
8 changed files with 429 additions and 145 deletions

View File

@ -253,7 +253,7 @@ public class JoinHelper {
} }
void logWarnWithTimestamp() { void logWarnWithTimestamp() {
logger.info(() -> new ParameterizedMessage("last failed join attempt was {} ago, failed to join {} with {}", logger.warn(() -> new ParameterizedMessage("last failed join attempt was {} ago, failed to join {} with {}",
TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - timestamp)), TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - timestamp)),
destination, destination,
joinRequest), joinRequest),

View File

@ -0,0 +1,148 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.bytes;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.BytesStream;
import org.elasticsearch.common.io.stream.StreamOutput;
import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.core.internal.io.IOUtils;
import java.io.IOException;
import java.util.Objects;
/**
* An in-memory {@link StreamOutput} which first fills the given {@code byte[]} and then allocates more space from the given
* {@link BigArrays} if needed. The idea is that you can use this for passing data to an API that requires a single {@code byte[]} (or a
* {@link org.apache.lucene.util.BytesRef}) which you'd prefer to re-use if possible, avoiding excessive allocations, but which may not
* always be large enough.
*/
public class RecyclingBytesStreamOutput extends BytesStream {
private final byte[] buffer;
private final BigArrays bigArrays;
private int position;
@Nullable // if buffer is large enough
private ByteArray overflow;
public RecyclingBytesStreamOutput(byte[] buffer, BigArrays bigArrays) {
this.buffer = Objects.requireNonNull(buffer);
this.bigArrays = Objects.requireNonNull(bigArrays);
}
@Override
public void writeByte(byte b) {
if (position < buffer.length) {
buffer[position++] = b;
} else {
ensureCapacity(position + 1);
overflow.set(position++ - buffer.length, b);
}
}
private void ensureCapacity(int size) {
final int overflowSize = size - buffer.length;
assert overflowSize > 0 : "no need to ensureCapacity(" + size + ") with buffer of size [" + buffer.length + "]";
assert position >= buffer.length
: "no need to ensureCapacity(" + size + ") with buffer of size [" + buffer.length + "] at position [" + position + "]";
if (overflow == null) {
overflow = bigArrays.newByteArray(overflowSize, false);
} else if (overflowSize > overflow.size()) {
overflow = bigArrays.resize(overflow, overflowSize);
}
assert overflow.size() >= overflowSize;
}
@Override
public void writeBytes(byte[] b, int offset, int length) {
if (position < buffer.length) {
final int lengthForBuffer = Math.min(length, buffer.length - position);
System.arraycopy(b, offset, buffer, position, lengthForBuffer);
position += lengthForBuffer;
offset += lengthForBuffer;
length -= lengthForBuffer;
}
if (length > 0) {
ensureCapacity(position + length);
overflow.set(position - buffer.length, b, offset, length);
position += length;
}
}
@Override
public void flush() {
}
@Override
public void close() throws IOException {
IOUtils.close(overflow);
}
@Override
public void reset() throws IOException {
throw new UnsupportedOperationException();
}
/**
* Return the written bytes in a {@link BytesRef}, avoiding allocating a new {@code byte[]} if the original buffer was already large
* enough. If we allocate a new (larger) buffer here then callers should typically re-use it for subsequent streams.
*/
public BytesRef toBytesRef() {
if (position <= buffer.length) {
assert overflow == null;
return new BytesRef(buffer, 0, position);
}
final byte[] newBuffer = new byte[position];
System.arraycopy(buffer, 0, newBuffer, 0, buffer.length);
int copyPos = buffer.length;
final BytesRefIterator iterator = new PagedBytesReference(overflow, position - buffer.length).iterator();
BytesRef bytesRef;
try {
while ((bytesRef = iterator.next()) != null) {
assert copyPos + bytesRef.length <= position;
System.arraycopy(bytesRef.bytes, bytesRef.offset, newBuffer, copyPos, bytesRef.length);
copyPos += bytesRef.length;
}
} catch (IOException e) {
throw new AssertionError("impossible", e);
}
return new BytesRef(newBuffer, 0, position);
}
@Override
public BytesReference bytes() {
if (position <= buffer.length) {
assert overflow == null;
return new BytesArray(buffer, 0, position);
} else {
return CompositeBytesReference.of(
new BytesArray(buffer, 0, buffer.length),
new PagedBytesReference(overflow, position - buffer.length));
}
}
}

View File

@ -46,6 +46,7 @@ import org.apache.lucene.store.Directory;
import org.apache.lucene.store.SimpleFSDirectory; import org.apache.lucene.store.SimpleFSDirectory;
import org.apache.lucene.util.Bits; import org.apache.lucene.util.Bits;
import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.BytesRefIterator;
import org.apache.lucene.util.SetOnce; import org.apache.lucene.util.SetOnce;
import org.elasticsearch.Version; import org.elasticsearch.Version;
import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ClusterState;
@ -53,14 +54,19 @@ import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.Metadata;
import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.CheckedConsumer;
import org.elasticsearch.common.Nullable; import org.elasticsearch.common.Nullable;
import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; import org.elasticsearch.common.bytes.PagedBytesReference;
import org.elasticsearch.common.bytes.RecyclingBytesStreamOutput;
import org.elasticsearch.common.io.Streams;
import org.elasticsearch.common.lease.Releasable; import org.elasticsearch.common.lease.Releasable;
import org.elasticsearch.common.lease.Releasables;
import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.logging.Loggers;
import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.lucene.Lucene;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.settings.Setting;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.ByteArray;
import org.elasticsearch.common.util.PageCacheRecycler;
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.NamedXContentRegistry;
import org.elasticsearch.common.xcontent.ToXContent; import org.elasticsearch.common.xcontent.ToXContent;
@ -73,7 +79,6 @@ import org.elasticsearch.env.NodeMetadata;
import org.elasticsearch.index.Index; import org.elasticsearch.index.Index;
import java.io.Closeable; import java.io.Closeable;
import java.io.FilterOutputStream;
import java.io.IOError; import java.io.IOError;
import java.io.IOException; import java.io.IOException;
import java.nio.file.Files; import java.nio.file.Files;
@ -452,29 +457,6 @@ public class PersistedClusterStateService {
FORMAT_PARAMS = new ToXContent.MapParams(params); FORMAT_PARAMS = new ToXContent.MapParams(params);
} }
/**
* A {@link Document} with a stored field containing serialized metadata written to a {@link ReleasableBytesStreamOutput} which must be
* released when no longer needed.
*/
private static class ReleasableDocument implements Releasable {
private final Document document;
private final Releasable releasable;
ReleasableDocument(Document document, Releasable releasable) {
this.document = document;
this.releasable = releasable;
}
Document getDocument() {
return document;
}
@Override
public void close() {
releasable.close();
}
}
/** /**
* Encapsulates a single {@link IndexWriter} with its {@link Directory} for ease of closing, and a {@link Logger}. There is one of these * Encapsulates a single {@link IndexWriter} with its {@link Directory} for ease of closing, and a {@link Logger}. There is one of these
* for each data path. * for each data path.
@ -547,6 +529,10 @@ public class PersistedClusterStateService {
boolean fullStateWritten = false; boolean fullStateWritten = false;
private final AtomicBoolean closed = new AtomicBoolean(); private final AtomicBoolean closed = new AtomicBoolean();
// The size of the document buffer that was used for the last write operation, used as a hint for allocating the buffer for the
// next one.
private int documentBufferUsed;
private Writer(List<MetadataIndexWriter> metadataIndexWriters, String nodeId, BigArrays bigArrays, private Writer(List<MetadataIndexWriter> metadataIndexWriters, String nodeId, BigArrays bigArrays,
LongSupplier relativeTimeMillisSupplier, Supplier<TimeValue> slowWriteLoggingThresholdSupplier) { LongSupplier relativeTimeMillisSupplier, Supplier<TimeValue> slowWriteLoggingThresholdSupplier) {
this.metadataIndexWriters = metadataIndexWriters; this.metadataIndexWriters = metadataIndexWriters;
@ -650,56 +636,60 @@ public class PersistedClusterStateService {
logger.trace("currentTerm [{}] matches previous currentTerm, writing changes only", logger.trace("currentTerm [{}] matches previous currentTerm, writing changes only",
metadata.coordinationMetadata().term()); metadata.coordinationMetadata().term());
final boolean updateGlobalMeta = Metadata.isGlobalStateEquals(previouslyWrittenMetadata, metadata) == false; try (DocumentBuffer documentBuffer = allocateBuffer()) {
if (updateGlobalMeta) {
try (ReleasableDocument globalMetadataDocument = makeGlobalMetadataDocument(metadata)) { final boolean updateGlobalMeta = Metadata.isGlobalStateEquals(previouslyWrittenMetadata, metadata) == false;
if (updateGlobalMeta) {
final Document globalMetadataDocument = makeGlobalMetadataDocument(metadata, documentBuffer);
for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
metadataIndexWriter.updateGlobalMetadata(globalMetadataDocument.getDocument()); metadataIndexWriter.updateGlobalMetadata(globalMetadataDocument);
} }
} }
}
final Map<String, Long> indexMetadataVersionByUUID = new HashMap<>(previouslyWrittenMetadata.indices().size()); final Map<String, Long> indexMetadataVersionByUUID = new HashMap<>(previouslyWrittenMetadata.indices().size());
for (ObjectCursor<IndexMetadata> cursor : previouslyWrittenMetadata.indices().values()) { for (ObjectCursor<IndexMetadata> cursor : previouslyWrittenMetadata.indices().values()) {
final IndexMetadata indexMetadata = cursor.value; final IndexMetadata indexMetadata = cursor.value;
final Long previousValue = indexMetadataVersionByUUID.putIfAbsent(indexMetadata.getIndexUUID(), indexMetadata.getVersion()); final Long previousValue
assert previousValue == null : indexMetadata.getIndexUUID() + " already mapped to " + previousValue; = indexMetadataVersionByUUID.putIfAbsent(indexMetadata.getIndexUUID(), indexMetadata.getVersion());
} assert previousValue == null : indexMetadata.getIndexUUID() + " already mapped to " + previousValue;
}
int numIndicesUpdated = 0; int numIndicesUpdated = 0;
int numIndicesUnchanged = 0; int numIndicesUnchanged = 0;
for (ObjectCursor<IndexMetadata> cursor : metadata.indices().values()) { for (ObjectCursor<IndexMetadata> cursor : metadata.indices().values()) {
final IndexMetadata indexMetadata = cursor.value; final IndexMetadata indexMetadata = cursor.value;
final Long previousVersion = indexMetadataVersionByUUID.get(indexMetadata.getIndexUUID()); final Long previousVersion = indexMetadataVersionByUUID.get(indexMetadata.getIndexUUID());
if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { if (previousVersion == null || indexMetadata.getVersion() != previousVersion) {
logger.trace("updating metadata for [{}], changing version from [{}] to [{}]", logger.trace("updating metadata for [{}], changing version from [{}] to [{}]",
indexMetadata.getIndex(), previousVersion, indexMetadata.getVersion()); indexMetadata.getIndex(), previousVersion, indexMetadata.getVersion());
numIndicesUpdated++; numIndicesUpdated++;
try (ReleasableDocument indexMetadataDocument = makeIndexMetadataDocument(indexMetadata)) { final Document indexMetadataDocument = makeIndexMetadataDocument(indexMetadata, documentBuffer);
for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
metadataIndexWriter.updateIndexMetadataDocument(indexMetadataDocument.getDocument(), indexMetadata.getIndex()); metadataIndexWriter.updateIndexMetadataDocument(indexMetadataDocument, indexMetadata.getIndex());
} }
} else {
numIndicesUnchanged++;
logger.trace("no action required for [{}]", indexMetadata.getIndex());
} }
} else { indexMetadataVersionByUUID.remove(indexMetadata.getIndexUUID());
numIndicesUnchanged++;
logger.trace("no action required for [{}]", indexMetadata.getIndex());
} }
indexMetadataVersionByUUID.remove(indexMetadata.getIndexUUID());
}
for (String removedIndexUUID : indexMetadataVersionByUUID.keySet()) { documentBufferUsed = documentBuffer.getMaxUsed();
for (String removedIndexUUID : indexMetadataVersionByUUID.keySet()) {
for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
metadataIndexWriter.deleteIndexMetadata(removedIndexUUID);
}
}
// Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more
// gracefully than one that occurs during the commit process.
for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
metadataIndexWriter.deleteIndexMetadata(removedIndexUUID); metadataIndexWriter.flush();
} }
}
// Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more return new WriterStats(updateGlobalMeta, numIndicesUpdated, numIndicesUnchanged);
// gracefully than one that occurs during the commit process.
for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
metadataIndexWriter.flush();
} }
return new WriterStats(updateGlobalMeta, numIndicesUpdated, numIndicesUnchanged);
} }
/** /**
@ -716,28 +706,39 @@ public class PersistedClusterStateService {
* Add documents for the metadata of the given cluster state, assuming that there are currently no documents. * Add documents for the metadata of the given cluster state, assuming that there are currently no documents.
*/ */
private WriterStats addMetadata(Metadata metadata) throws IOException { private WriterStats addMetadata(Metadata metadata) throws IOException {
try (ReleasableDocument globalMetadataDocument = makeGlobalMetadataDocument(metadata)) { try (DocumentBuffer documentBuffer = allocateBuffer()) {
for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
metadataIndexWriter.updateGlobalMetadata(globalMetadataDocument.getDocument());
}
}
for (ObjectCursor<IndexMetadata> cursor : metadata.indices().values()) { final Document globalMetadataDocument = makeGlobalMetadataDocument(metadata, documentBuffer);
final IndexMetadata indexMetadata = cursor.value; for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
try (ReleasableDocument indexMetadataDocument = makeIndexMetadataDocument(indexMetadata)) { metadataIndexWriter.updateGlobalMetadata(globalMetadataDocument);
}
for (ObjectCursor<IndexMetadata> cursor : metadata.indices().values()) {
final IndexMetadata indexMetadata = cursor.value;
final Document indexMetadataDocument = makeIndexMetadataDocument(indexMetadata, documentBuffer);
for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
metadataIndexWriter.updateIndexMetadataDocument(indexMetadataDocument.getDocument(), indexMetadata.getIndex()); metadataIndexWriter.updateIndexMetadataDocument(indexMetadataDocument, indexMetadata.getIndex());
} }
} }
}
// Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more documentBufferUsed = documentBuffer.getMaxUsed();
// gracefully than one that occurs during the commit process.
for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
metadataIndexWriter.flush();
}
return new WriterStats(true, metadata.indices().size(), 0); // Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more
// gracefully than one that occurs during the commit process.
for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) {
metadataIndexWriter.flush();
}
return new WriterStats(true, metadata.indices().size(), 0);
}
}
private DocumentBuffer allocateBuffer() {
// heuristics for picking the initial buffer size based on the buffer we needed last time: try and fit within a single page,
// but if we needed more than a single page last time then allow a bit more space to try and avoid needing to grow the buffer
// later on.
final int extraSpace = documentBufferUsed <= PageCacheRecycler.PAGE_SIZE_IN_BYTES ? 0 : PageCacheRecycler.PAGE_SIZE_IN_BYTES;
return new DocumentBuffer(documentBufferUsed + extraSpace, bigArrays);
} }
public void writeIncrementalTermUpdateAndCommit(long currentTerm, long lastAcceptedVersion) throws IOException { public void writeIncrementalTermUpdateAndCommit(long currentTerm, long lastAcceptedVersion) throws IOException {
@ -802,59 +803,97 @@ public class PersistedClusterStateService {
} }
} }
private ReleasableDocument makeIndexMetadataDocument(IndexMetadata indexMetadata) throws IOException { private Document makeIndexMetadataDocument(IndexMetadata indexMetadata, DocumentBuffer documentBuffer) throws IOException {
final ReleasableDocument indexMetadataDocument = makeDocument(INDEX_TYPE_NAME, indexMetadata); final Document indexMetadataDocument = makeDocument(INDEX_TYPE_NAME, indexMetadata, documentBuffer);
boolean success = false; final String indexUUID = indexMetadata.getIndexUUID();
try { assert indexUUID.equals(IndexMetadata.INDEX_UUID_NA_VALUE) == false;
final String indexUUID = indexMetadata.getIndexUUID(); indexMetadataDocument.add(new StringField(INDEX_UUID_FIELD_NAME, indexUUID, Field.Store.NO));
assert indexUUID.equals(IndexMetadata.INDEX_UUID_NA_VALUE) == false; return indexMetadataDocument;
indexMetadataDocument.getDocument().add(new StringField(INDEX_UUID_FIELD_NAME, indexUUID, Field.Store.NO));
success = true;
return indexMetadataDocument;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(indexMetadataDocument);
}
}
} }
private ReleasableDocument makeGlobalMetadataDocument(Metadata metadata) throws IOException { private Document makeGlobalMetadataDocument(Metadata metadata, DocumentBuffer documentBuffer) throws IOException {
return makeDocument(GLOBAL_TYPE_NAME, metadata); return makeDocument(GLOBAL_TYPE_NAME, metadata, documentBuffer);
} }
private ReleasableDocument makeDocument(String typeName, ToXContent metadata) throws IOException { private Document makeDocument(String typeName, ToXContent metadata, DocumentBuffer documentBuffer) throws IOException {
final Document document = new Document(); final Document document = new Document();
document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO));
boolean success = false; try (RecyclingBytesStreamOutput streamOutput = documentBuffer.streamOutput()) {
final ReleasableBytesStreamOutput releasableBytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays); try (XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.SMILE,
try { Streams.flushOnCloseStream(streamOutput))) {
final FilterOutputStream outputStream = new FilterOutputStream(releasableBytesStreamOutput) {
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
}
@Override
public void close() {
// closing the XContentBuilder should not release the bytes yet
}
};
try (XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.SMILE, outputStream)) {
xContentBuilder.startObject(); xContentBuilder.startObject();
metadata.toXContent(xContentBuilder, FORMAT_PARAMS); metadata.toXContent(xContentBuilder, FORMAT_PARAMS);
xContentBuilder.endObject(); xContentBuilder.endObject();
} }
document.add(new StoredField(DATA_FIELD_NAME, releasableBytesStreamOutput.bytes().toBytesRef())); document.add(new StoredField(DATA_FIELD_NAME, streamOutput.toBytesRef()));
final ReleasableDocument releasableDocument = new ReleasableDocument(document, releasableBytesStreamOutput);
success = true;
return releasableDocument;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(releasableBytesStreamOutput);
}
} }
return document;
}
}
/**
* Holds the current buffer, keeping track of new allocations as it grows.
*/
private static class DocumentBuffer implements Releasable {
private final BigArrays bigArrays;
@Nullable // if the initial page doesn't need releasing
private final Releasable releasable;
private byte[] buffer;
private int maxUsed;
DocumentBuffer(int size, BigArrays bigArrays) {
if (size <= PageCacheRecycler.PAGE_SIZE_IN_BYTES) {
final ByteArray byteArray = bigArrays.newByteArray(PageCacheRecycler.PAGE_SIZE_IN_BYTES);
final BytesRefIterator iterator = new PagedBytesReference(byteArray, Math.toIntExact(byteArray.size())).iterator();
final BytesRef firstPage;
try {
firstPage = iterator.next();
assert iterator.next() == null : "should be one page";
} catch (IOException e) {
throw new AssertionError("impossible", e);
}
// we require that we have the whole page to ourselves
assert firstPage.offset == 0 : firstPage.offset;
assert firstPage.bytes.length == PageCacheRecycler.PAGE_SIZE_IN_BYTES : firstPage.bytes.length;
buffer = firstPage.bytes;
releasable = byteArray;
} else {
buffer = new byte[size];
releasable = null;
}
this.bigArrays = bigArrays;
maxUsed = 0;
}
RecyclingBytesStreamOutput streamOutput() {
return new RecyclingBytesStreamOutput(buffer, bigArrays) {
@Override
public BytesRef toBytesRef() {
final BytesRef bytesRef = super.toBytesRef();
maxUsed = Math.max(maxUsed, bytesRef.length);
if (buffer != bytesRef.bytes) {
assert bytesRef.length > buffer.length;
logger.trace("growing document buffer from [{}] to [{}]", buffer.length, maxUsed);
buffer = bytesRef.bytes;
}
assert maxUsed <= buffer.length;
return bytesRef;
}
};
}
int getMaxUsed() {
return maxUsed;
}
@Override
public void close() {
Releasables.close(releasable);
} }
} }
} }

View File

@ -0,0 +1,80 @@
/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.common.bytes;
import org.apache.lucene.util.BytesRef;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.test.ESTestCase;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.sameInstance;
public class RecyclingBytesStreamOutputTests extends ESTestCase {
public void testReturnsWrittenBytesAndRecyclesBufferIfPossible() throws IOException {
final byte[] source = randomUnicodeOfLength(scaledRandomIntBetween(0, 20000)).getBytes(StandardCharsets.UTF_8);
final byte[] buffer = new byte[scaledRandomIntBetween(0, 20000)];
final MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
try (RecyclingBytesStreamOutput output = new RecyclingBytesStreamOutput(buffer, bigArrays)) {
int position = 0;
while (position < source.length) {
if (randomBoolean()) {
output.writeByte(source[position++]);
} else {
final int length = randomIntBetween(1, source.length - position);
final int sliceStart = randomIntBetween(0, position);
final int sliceEnd = randomIntBetween(position + length, source.length);
final byte[] slice = new byte[sliceEnd - sliceStart];
System.arraycopy(source, sliceStart, slice, 0, slice.length);
output.writeBytes(slice, position - sliceStart, length);
position += length;
}
}
final BytesRef bytesRef;
if (randomBoolean()) {
bytesRef = output.toBytesRef();
assertThat(bytesRef.offset, equalTo(0));
if (source.length <= buffer.length) {
assertThat("should have re-used the same buffer", bytesRef.bytes, sameInstance(buffer));
} else {
assertThat("new buffer should be the right size", bytesRef.bytes.length, equalTo(source.length));
}
} else {
bytesRef = output.bytes().toBytesRef();
}
assertThat(bytesRef.length, equalTo(source.length));
final byte[] trimmed = new byte[source.length];
System.arraycopy(bytesRef.bytes, bytesRef.offset, trimmed, 0, bytesRef.length);
assertArrayEquals(source, trimmed);
}
}
}

View File

@ -38,11 +38,14 @@ import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.ClusterSettings;
import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.common.util.set.Sets;
import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.core.internal.io.IOUtils;
import org.elasticsearch.env.Environment; import org.elasticsearch.env.Environment;
import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.NodeEnvironment;
import org.elasticsearch.env.TestEnvironment; import org.elasticsearch.env.TestEnvironment;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.node.Node; import org.elasticsearch.node.Node;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.threadpool.TestThreadPool; import org.elasticsearch.threadpool.TestThreadPool;
@ -71,9 +74,11 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
private ClusterName clusterName; private ClusterName clusterName;
private Settings settings; private Settings settings;
private DiscoveryNode localNode; private DiscoveryNode localNode;
private BigArrays bigArrays;
@Override @Override
public void setUp() throws Exception { public void setUp() throws Exception {
bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
nodeEnvironment = newNodeEnvironment(); nodeEnvironment = newNodeEnvironment();
localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(), localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(),
Sets.newHashSet(DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT); Sets.newHashSet(DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT);
@ -89,7 +94,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
} }
private CoordinationState.PersistedState newGatewayPersistedState() { private CoordinationState.PersistedState newGatewayPersistedState() {
final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode); final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode, bigArrays);
gateway.start(settings, nodeEnvironment, xContentRegistry()); gateway.start(settings, nodeEnvironment, xContentRegistry());
final CoordinationState.PersistedState persistedState = gateway.getPersistedState(); final CoordinationState.PersistedState persistedState = gateway.getPersistedState();
assertThat(persistedState, instanceOf(GatewayMetaState.LucenePersistedState.class)); assertThat(persistedState, instanceOf(GatewayMetaState.LucenePersistedState.class));
@ -298,7 +303,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
public void testStatePersistedOnLoad() throws IOException { public void testStatePersistedOnLoad() throws IOException {
// open LucenePersistedState to make sure that cluster state is written out to each data path // open LucenePersistedState to make sure that cluster state is written out to each data path
final PersistedClusterStateService persistedClusterStateService = final PersistedClusterStateService persistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L); new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
final ClusterState state = createClusterState(randomNonNegativeLong(), final ClusterState state = createClusterState(randomNonNegativeLong(),
Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build()); Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build());
@ -316,7 +321,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
.put(Environment.PATH_DATA_SETTING.getKey(), path.getParent().getParent().toString()).build(); .put(Environment.PATH_DATA_SETTING.getKey(), path.getParent().getParent().toString()).build();
try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) { try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) {
final PersistedClusterStateService newPersistedClusterStateService = final PersistedClusterStateService newPersistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L); new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState(); final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState();
assertFalse(onDiskState.empty()); assertFalse(onDiskState.empty());
@ -340,7 +345,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
.put(nonMasterNode()) .put(nonMasterNode())
.put(Node.NODE_NAME_SETTING.getKey(), "test") .put(Node.NODE_NAME_SETTING.getKey(), "test")
.build(); .build();
final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode); final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode, bigArrays);
cleanup.add(gateway); cleanup.add(gateway);
final TransportService transportService = mock(TransportService.class); final TransportService transportService = mock(TransportService.class);
TestThreadPool threadPool = new TestThreadPool("testMarkAcceptedConfigAsCommittedOnDataOnlyNode"); TestThreadPool threadPool = new TestThreadPool("testMarkAcceptedConfigAsCommittedOnDataOnlyNode");
@ -350,7 +355,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
when(clusterService.getClusterSettings()).thenReturn( when(clusterService.getClusterSettings()).thenReturn(
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS));
final PersistedClusterStateService persistedClusterStateService = final PersistedClusterStateService persistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L); new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
gateway.start(settings, transportService, clusterService, gateway.start(settings, transportService, clusterService,
new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, persistedClusterStateService); new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, persistedClusterStateService);
@ -437,7 +442,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
final AtomicReference<Double> ioExceptionRate = new AtomicReference<>(0.01d); final AtomicReference<Double> ioExceptionRate = new AtomicReference<>(0.01d);
final List<MockDirectoryWrapper> list = new ArrayList<>(); final List<MockDirectoryWrapper> list = new ArrayList<>();
final PersistedClusterStateService persistedClusterStateService = final PersistedClusterStateService persistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) { new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) {
@Override @Override
Directory createDirectory(Path path) { Directory createDirectory(Path path) {
@ -514,7 +519,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
.put(Environment.PATH_DATA_SETTING.getKey(), path.getParent().getParent().toString()).build(); .put(Environment.PATH_DATA_SETTING.getKey(), path.getParent().getParent().toString()).build();
try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) { try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) {
final PersistedClusterStateService newPersistedClusterStateService = final PersistedClusterStateService newPersistedClusterStateService =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L); new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L);
final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState(); final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState();
assertFalse(onDiskState.empty()); assertFalse(onDiskState.empty());
@ -527,4 +532,10 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase {
} }
} }
private static BigArrays getBigArrays() {
return usually()
? BigArrays.NON_RECYCLING_INSTANCE
: new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
}
} }

View File

@ -77,10 +77,7 @@ import static org.hamcrest.Matchers.nullValue;
public class PersistedClusterStateServiceTests extends ESTestCase { public class PersistedClusterStateServiceTests extends ESTestCase {
private PersistedClusterStateService newPersistedClusterStateService(NodeEnvironment nodeEnvironment) { private PersistedClusterStateService newPersistedClusterStateService(NodeEnvironment nodeEnvironment) {
return new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), return new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
usually()
? BigArrays.NON_RECYCLING_INSTANCE
: new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
() -> 0L); () -> 0L);
} }
@ -358,7 +355,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
final PersistedClusterStateService persistedClusterStateService final PersistedClusterStateService persistedClusterStateService
= new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) {
@Override @Override
Directory createDirectory(Path path) throws IOException { Directory createDirectory(Path path) throws IOException {
@ -396,7 +393,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
final PersistedClusterStateService persistedClusterStateService final PersistedClusterStateService persistedClusterStateService
= new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) {
@Override @Override
Directory createDirectory(Path path) throws IOException { Directory createDirectory(Path path) throws IOException {
@ -442,7 +439,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
final PersistedClusterStateService persistedClusterStateService final PersistedClusterStateService persistedClusterStateService
= new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(),
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) { new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) {
@Override @Override
Directory createDirectory(Path path) throws IOException { Directory createDirectory(Path path) throws IOException {
@ -800,12 +797,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) {
PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService(nodeEnvironment, PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService(nodeEnvironment,
xContentRegistry(), xContentRegistry(), getBigArrays(), clusterSettings, () -> currentTime.getAndAdd(writeDurationMillis.get()));
usually()
? BigArrays.NON_RECYCLING_INSTANCE
: new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()),
clusterSettings,
() -> currentTime.getAndAdd(writeDurationMillis.get()));
try (Writer writer = persistedClusterStateService.createWriter()) { try (Writer writer = persistedClusterStateService.createWriter()) {
assertExpectedLogs(1L, null, clusterState, writer, new MockLogAppender.SeenEventExpectation( assertExpectedLogs(1L, null, clusterState, writer, new MockLogAppender.SeenEventExpectation(
@ -923,5 +915,10 @@ public class PersistedClusterStateServiceTests extends ESTestCase {
return ClusterState.builder(ClusterName.DEFAULT).version(version).metadata(metadata).build(); return ClusterState.builder(ClusterName.DEFAULT).version(version).metadata(metadata).build();
} }
private static BigArrays getBigArrays() {
return usually()
? BigArrays.NON_RECYCLING_INSTANCE
: new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
}
} }

View File

@ -57,6 +57,8 @@ import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.transport.TransportAddress;
import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.util.BigArrays; import org.elasticsearch.common.util.BigArrays;
import org.elasticsearch.common.util.MockBigArrays;
import org.elasticsearch.common.util.MockPageCacheRecycler;
import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor;
import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.DiscoveryModule;
import org.elasticsearch.discovery.SeedHostsProvider; import org.elasticsearch.discovery.SeedHostsProvider;
@ -65,6 +67,7 @@ import org.elasticsearch.gateway.ClusterStateUpdaters;
import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.gateway.MockGatewayMetaState; import org.elasticsearch.gateway.MockGatewayMetaState;
import org.elasticsearch.gateway.PersistedClusterStateService; import org.elasticsearch.gateway.PersistedClusterStateService;
import org.elasticsearch.indices.breaker.NoneCircuitBreakerService;
import org.elasticsearch.monitor.NodeHealthService; import org.elasticsearch.monitor.NodeHealthService;
import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.monitor.StatusInfo;
import org.elasticsearch.test.ESTestCase; import org.elasticsearch.test.ESTestCase;
@ -257,7 +260,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>(); private final Map<Long, ClusterState> committedStatesByVersion = new HashMap<>();
private final LinearizabilityChecker linearizabilityChecker = new LinearizabilityChecker(); private final LinearizabilityChecker linearizabilityChecker = new LinearizabilityChecker();
private final History history = new History(); private final History history = new History();
private NodeHealthService nodeHealthService; private final BigArrays bigArrays;
private final NodeHealthService nodeHealthService;
private final Function<DiscoveryNode, MockPersistedState> defaultPersistedStateSupplier = MockPersistedState::new; private final Function<DiscoveryNode, MockPersistedState> defaultPersistedStateSupplier = MockPersistedState::new;
@ -274,6 +278,9 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
Cluster(int initialNodeCount, boolean allNodesMasterEligible, Settings nodeSettings, NodeHealthService nodeHealthService) { Cluster(int initialNodeCount, boolean allNodesMasterEligible, Settings nodeSettings, NodeHealthService nodeHealthService) {
this.nodeHealthService = nodeHealthService; this.nodeHealthService = nodeHealthService;
bigArrays = usually()
? BigArrays.NON_RECYCLING_INSTANCE
: new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService());
deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY);
assertThat(initialNodeCount, greaterThan(0)); assertThat(initialNodeCount, greaterThan(0));
@ -738,7 +745,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
if (rarely()) { if (rarely()) {
nodeEnvironment = newNodeEnvironment(); nodeEnvironment = newNodeEnvironment();
nodeEnvironments.add(nodeEnvironment); nodeEnvironments.add(nodeEnvironment);
final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(localNode); final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(localNode, bigArrays);
gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry()); gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry());
delegate = gatewayMetaState.getPersistedState(); delegate = gatewayMetaState.getPersistedState();
} else { } else {
@ -762,7 +769,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
final long updatedTerm = adaptCurrentTerm.apply(oldState.getCurrentTerm()); final long updatedTerm = adaptCurrentTerm.apply(oldState.getCurrentTerm());
if (updatedMetadata != oldState.getLastAcceptedState().metadata() || updatedTerm != oldState.getCurrentTerm()) { if (updatedMetadata != oldState.getLastAcceptedState().metadata() || updatedTerm != oldState.getCurrentTerm()) {
try (PersistedClusterStateService.Writer writer = try (PersistedClusterStateService.Writer writer =
new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), bigArrays,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
deterministicTaskQueue::getCurrentTimeMillis) deterministicTaskQueue::getCurrentTimeMillis)
.createWriter()) { .createWriter()) {
@ -770,7 +777,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase {
ClusterState.builder(oldState.getLastAcceptedState()).metadata(updatedMetadata).build()); ClusterState.builder(oldState.getLastAcceptedState()).metadata(updatedMetadata).build());
} }
} }
final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(newLocalNode); final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(newLocalNode, bigArrays);
gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry()); gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry());
delegate = gatewayMetaState.getPersistedState(); delegate = gatewayMetaState.getPersistedState();
} else { } else {

View File

@ -48,9 +48,11 @@ import static org.mockito.Mockito.when;
*/ */
public class MockGatewayMetaState extends GatewayMetaState { public class MockGatewayMetaState extends GatewayMetaState {
private final DiscoveryNode localNode; private final DiscoveryNode localNode;
private final BigArrays bigArrays;
public MockGatewayMetaState(DiscoveryNode localNode) { public MockGatewayMetaState(DiscoveryNode localNode, BigArrays bigArrays) {
this.localNode = localNode; this.localNode = localNode;
this.bigArrays = bigArrays;
} }
@Override @Override
@ -79,7 +81,7 @@ public class MockGatewayMetaState extends GatewayMetaState {
throw new AssertionError(e); throw new AssertionError(e);
} }
start(settings, transportService, clusterService, metaStateService, start(settings, transportService, clusterService, metaStateService,
null, null, new PersistedClusterStateService(nodeEnvironment, xContentRegistry, BigArrays.NON_RECYCLING_INSTANCE, null, null, new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays,
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L)); new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L));
} }
} }