diff --git a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java index d722f946d63..92b92ac1cef 100644 --- a/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java +++ b/server/src/main/java/org/elasticsearch/cluster/coordination/JoinHelper.java @@ -253,7 +253,7 @@ public class JoinHelper { } void logWarnWithTimestamp() { - logger.info(() -> new ParameterizedMessage("last failed join attempt was {} ago, failed to join {} with {}", + logger.warn(() -> new ParameterizedMessage("last failed join attempt was {} ago, failed to join {} with {}", TimeValue.timeValueMillis(TimeValue.nsecToMSec(System.nanoTime() - timestamp)), destination, joinRequest), diff --git a/server/src/main/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutput.java b/server/src/main/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutput.java new file mode 100644 index 00000000000..38c83e9b1f9 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutput.java @@ -0,0 +1,148 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.bytes; + +import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefIterator; +import org.elasticsearch.common.Nullable; +import org.elasticsearch.common.io.stream.BytesStream; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ByteArray; +import org.elasticsearch.core.internal.io.IOUtils; + +import java.io.IOException; +import java.util.Objects; + +/** + * An in-memory {@link StreamOutput} which first fills the given {@code byte[]} and then allocates more space from the given + * {@link BigArrays} if needed. The idea is that you can use this for passing data to an API that requires a single {@code byte[]} (or a + * {@link org.apache.lucene.util.BytesRef}) which you'd prefer to re-use if possible, avoiding excessive allocations, but which may not + * always be large enough. + */ +public class RecyclingBytesStreamOutput extends BytesStream { + + private final byte[] buffer; + private final BigArrays bigArrays; + + private int position; + + @Nullable // if buffer is large enough + private ByteArray overflow; + + public RecyclingBytesStreamOutput(byte[] buffer, BigArrays bigArrays) { + this.buffer = Objects.requireNonNull(buffer); + this.bigArrays = Objects.requireNonNull(bigArrays); + } + + @Override + public void writeByte(byte b) { + if (position < buffer.length) { + buffer[position++] = b; + } else { + ensureCapacity(position + 1); + overflow.set(position++ - buffer.length, b); + } + } + + private void ensureCapacity(int size) { + final int overflowSize = size - buffer.length; + assert overflowSize > 0 : "no need to ensureCapacity(" + size + ") with buffer of size [" + buffer.length + "]"; + assert position >= buffer.length + : "no need to ensureCapacity(" + size + ") with buffer of size [" + buffer.length + "] at position [" + position + "]"; + if (overflow == null) { + overflow = bigArrays.newByteArray(overflowSize, false); + } else if (overflowSize > overflow.size()) { + overflow = bigArrays.resize(overflow, overflowSize); + } + assert overflow.size() >= overflowSize; + } + + @Override + public void writeBytes(byte[] b, int offset, int length) { + if (position < buffer.length) { + final int lengthForBuffer = Math.min(length, buffer.length - position); + System.arraycopy(b, offset, buffer, position, lengthForBuffer); + position += lengthForBuffer; + offset += lengthForBuffer; + length -= lengthForBuffer; + } + + if (length > 0) { + ensureCapacity(position + length); + overflow.set(position - buffer.length, b, offset, length); + position += length; + } + } + + @Override + public void flush() { + } + + @Override + public void close() throws IOException { + IOUtils.close(overflow); + } + + @Override + public void reset() throws IOException { + throw new UnsupportedOperationException(); + } + + /** + * Return the written bytes in a {@link BytesRef}, avoiding allocating a new {@code byte[]} if the original buffer was already large + * enough. If we allocate a new (larger) buffer here then callers should typically re-use it for subsequent streams. + */ + public BytesRef toBytesRef() { + if (position <= buffer.length) { + assert overflow == null; + return new BytesRef(buffer, 0, position); + } + + final byte[] newBuffer = new byte[position]; + System.arraycopy(buffer, 0, newBuffer, 0, buffer.length); + int copyPos = buffer.length; + final BytesRefIterator iterator = new PagedBytesReference(overflow, position - buffer.length).iterator(); + BytesRef bytesRef; + try { + while ((bytesRef = iterator.next()) != null) { + assert copyPos + bytesRef.length <= position; + System.arraycopy(bytesRef.bytes, bytesRef.offset, newBuffer, copyPos, bytesRef.length); + copyPos += bytesRef.length; + } + } catch (IOException e) { + throw new AssertionError("impossible", e); + } + + return new BytesRef(newBuffer, 0, position); + } + + @Override + public BytesReference bytes() { + if (position <= buffer.length) { + assert overflow == null; + return new BytesArray(buffer, 0, position); + } else { + return CompositeBytesReference.of( + new BytesArray(buffer, 0, buffer.length), + new PagedBytesReference(overflow, position - buffer.length)); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java index 1b5bb0ac8c0..ef733223762 100644 --- a/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/gateway/PersistedClusterStateService.java @@ -46,6 +46,7 @@ import org.apache.lucene.store.Directory; import org.apache.lucene.store.SimpleFSDirectory; import org.apache.lucene.util.Bits; import org.apache.lucene.util.BytesRef; +import org.apache.lucene.util.BytesRefIterator; import org.apache.lucene.util.SetOnce; import org.elasticsearch.Version; import org.elasticsearch.cluster.ClusterState; @@ -53,14 +54,19 @@ import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.common.CheckedConsumer; import org.elasticsearch.common.Nullable; -import org.elasticsearch.common.io.stream.ReleasableBytesStreamOutput; +import org.elasticsearch.common.bytes.PagedBytesReference; +import org.elasticsearch.common.bytes.RecyclingBytesStreamOutput; +import org.elasticsearch.common.io.Streams; import org.elasticsearch.common.lease.Releasable; +import org.elasticsearch.common.lease.Releasables; import org.elasticsearch.common.logging.Loggers; import org.elasticsearch.common.lucene.Lucene; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Setting; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.ByteArray; +import org.elasticsearch.common.util.PageCacheRecycler; import org.elasticsearch.common.xcontent.LoggingDeprecationHandler; import org.elasticsearch.common.xcontent.NamedXContentRegistry; import org.elasticsearch.common.xcontent.ToXContent; @@ -73,7 +79,6 @@ import org.elasticsearch.env.NodeMetadata; import org.elasticsearch.index.Index; import java.io.Closeable; -import java.io.FilterOutputStream; import java.io.IOError; import java.io.IOException; import java.nio.file.Files; @@ -452,29 +457,6 @@ public class PersistedClusterStateService { FORMAT_PARAMS = new ToXContent.MapParams(params); } - /** - * A {@link Document} with a stored field containing serialized metadata written to a {@link ReleasableBytesStreamOutput} which must be - * released when no longer needed. - */ - private static class ReleasableDocument implements Releasable { - private final Document document; - private final Releasable releasable; - - ReleasableDocument(Document document, Releasable releasable) { - this.document = document; - this.releasable = releasable; - } - - Document getDocument() { - return document; - } - - @Override - public void close() { - releasable.close(); - } - } - /** * Encapsulates a single {@link IndexWriter} with its {@link Directory} for ease of closing, and a {@link Logger}. There is one of these * for each data path. @@ -547,6 +529,10 @@ public class PersistedClusterStateService { boolean fullStateWritten = false; private final AtomicBoolean closed = new AtomicBoolean(); + // The size of the document buffer that was used for the last write operation, used as a hint for allocating the buffer for the + // next one. + private int documentBufferUsed; + private Writer(List metadataIndexWriters, String nodeId, BigArrays bigArrays, LongSupplier relativeTimeMillisSupplier, Supplier slowWriteLoggingThresholdSupplier) { this.metadataIndexWriters = metadataIndexWriters; @@ -650,56 +636,60 @@ public class PersistedClusterStateService { logger.trace("currentTerm [{}] matches previous currentTerm, writing changes only", metadata.coordinationMetadata().term()); - final boolean updateGlobalMeta = Metadata.isGlobalStateEquals(previouslyWrittenMetadata, metadata) == false; - if (updateGlobalMeta) { - try (ReleasableDocument globalMetadataDocument = makeGlobalMetadataDocument(metadata)) { + try (DocumentBuffer documentBuffer = allocateBuffer()) { + + final boolean updateGlobalMeta = Metadata.isGlobalStateEquals(previouslyWrittenMetadata, metadata) == false; + if (updateGlobalMeta) { + final Document globalMetadataDocument = makeGlobalMetadataDocument(metadata, documentBuffer); for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { - metadataIndexWriter.updateGlobalMetadata(globalMetadataDocument.getDocument()); + metadataIndexWriter.updateGlobalMetadata(globalMetadataDocument); } } - } - final Map indexMetadataVersionByUUID = new HashMap<>(previouslyWrittenMetadata.indices().size()); - for (ObjectCursor cursor : previouslyWrittenMetadata.indices().values()) { - final IndexMetadata indexMetadata = cursor.value; - final Long previousValue = indexMetadataVersionByUUID.putIfAbsent(indexMetadata.getIndexUUID(), indexMetadata.getVersion()); - assert previousValue == null : indexMetadata.getIndexUUID() + " already mapped to " + previousValue; - } + final Map indexMetadataVersionByUUID = new HashMap<>(previouslyWrittenMetadata.indices().size()); + for (ObjectCursor cursor : previouslyWrittenMetadata.indices().values()) { + final IndexMetadata indexMetadata = cursor.value; + final Long previousValue + = indexMetadataVersionByUUID.putIfAbsent(indexMetadata.getIndexUUID(), indexMetadata.getVersion()); + assert previousValue == null : indexMetadata.getIndexUUID() + " already mapped to " + previousValue; + } - int numIndicesUpdated = 0; - int numIndicesUnchanged = 0; - for (ObjectCursor cursor : metadata.indices().values()) { - final IndexMetadata indexMetadata = cursor.value; - final Long previousVersion = indexMetadataVersionByUUID.get(indexMetadata.getIndexUUID()); - if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { - logger.trace("updating metadata for [{}], changing version from [{}] to [{}]", - indexMetadata.getIndex(), previousVersion, indexMetadata.getVersion()); - numIndicesUpdated++; - try (ReleasableDocument indexMetadataDocument = makeIndexMetadataDocument(indexMetadata)) { + int numIndicesUpdated = 0; + int numIndicesUnchanged = 0; + for (ObjectCursor cursor : metadata.indices().values()) { + final IndexMetadata indexMetadata = cursor.value; + final Long previousVersion = indexMetadataVersionByUUID.get(indexMetadata.getIndexUUID()); + if (previousVersion == null || indexMetadata.getVersion() != previousVersion) { + logger.trace("updating metadata for [{}], changing version from [{}] to [{}]", + indexMetadata.getIndex(), previousVersion, indexMetadata.getVersion()); + numIndicesUpdated++; + final Document indexMetadataDocument = makeIndexMetadataDocument(indexMetadata, documentBuffer); for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { - metadataIndexWriter.updateIndexMetadataDocument(indexMetadataDocument.getDocument(), indexMetadata.getIndex()); + metadataIndexWriter.updateIndexMetadataDocument(indexMetadataDocument, indexMetadata.getIndex()); } + } else { + numIndicesUnchanged++; + logger.trace("no action required for [{}]", indexMetadata.getIndex()); } - } else { - numIndicesUnchanged++; - logger.trace("no action required for [{}]", indexMetadata.getIndex()); + indexMetadataVersionByUUID.remove(indexMetadata.getIndexUUID()); } - indexMetadataVersionByUUID.remove(indexMetadata.getIndexUUID()); - } - for (String removedIndexUUID : indexMetadataVersionByUUID.keySet()) { + documentBufferUsed = documentBuffer.getMaxUsed(); + + for (String removedIndexUUID : indexMetadataVersionByUUID.keySet()) { + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.deleteIndexMetadata(removedIndexUUID); + } + } + + // Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more + // gracefully than one that occurs during the commit process. for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { - metadataIndexWriter.deleteIndexMetadata(removedIndexUUID); + metadataIndexWriter.flush(); } - } - // Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more - // gracefully than one that occurs during the commit process. - for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { - metadataIndexWriter.flush(); + return new WriterStats(updateGlobalMeta, numIndicesUpdated, numIndicesUnchanged); } - - return new WriterStats(updateGlobalMeta, numIndicesUpdated, numIndicesUnchanged); } /** @@ -716,28 +706,39 @@ public class PersistedClusterStateService { * Add documents for the metadata of the given cluster state, assuming that there are currently no documents. */ private WriterStats addMetadata(Metadata metadata) throws IOException { - try (ReleasableDocument globalMetadataDocument = makeGlobalMetadataDocument(metadata)) { - for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { - metadataIndexWriter.updateGlobalMetadata(globalMetadataDocument.getDocument()); - } - } + try (DocumentBuffer documentBuffer = allocateBuffer()) { - for (ObjectCursor cursor : metadata.indices().values()) { - final IndexMetadata indexMetadata = cursor.value; - try (ReleasableDocument indexMetadataDocument = makeIndexMetadataDocument(indexMetadata)) { + final Document globalMetadataDocument = makeGlobalMetadataDocument(metadata, documentBuffer); + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.updateGlobalMetadata(globalMetadataDocument); + } + + for (ObjectCursor cursor : metadata.indices().values()) { + final IndexMetadata indexMetadata = cursor.value; + final Document indexMetadataDocument = makeIndexMetadataDocument(indexMetadata, documentBuffer); for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { - metadataIndexWriter.updateIndexMetadataDocument(indexMetadataDocument.getDocument(), indexMetadata.getIndex()); + metadataIndexWriter.updateIndexMetadataDocument(indexMetadataDocument, indexMetadata.getIndex()); } } - } - // Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more - // gracefully than one that occurs during the commit process. - for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { - metadataIndexWriter.flush(); - } + documentBufferUsed = documentBuffer.getMaxUsed(); - return new WriterStats(true, metadata.indices().size(), 0); + // Flush, to try and expose a failure (e.g. out of disk space) before committing, because we can handle a failure here more + // gracefully than one that occurs during the commit process. + for (MetadataIndexWriter metadataIndexWriter : metadataIndexWriters) { + metadataIndexWriter.flush(); + } + + return new WriterStats(true, metadata.indices().size(), 0); + } + } + + private DocumentBuffer allocateBuffer() { + // heuristics for picking the initial buffer size based on the buffer we needed last time: try and fit within a single page, + // but if we needed more than a single page last time then allow a bit more space to try and avoid needing to grow the buffer + // later on. + final int extraSpace = documentBufferUsed <= PageCacheRecycler.PAGE_SIZE_IN_BYTES ? 0 : PageCacheRecycler.PAGE_SIZE_IN_BYTES; + return new DocumentBuffer(documentBufferUsed + extraSpace, bigArrays); } public void writeIncrementalTermUpdateAndCommit(long currentTerm, long lastAcceptedVersion) throws IOException { @@ -802,59 +803,97 @@ public class PersistedClusterStateService { } } - private ReleasableDocument makeIndexMetadataDocument(IndexMetadata indexMetadata) throws IOException { - final ReleasableDocument indexMetadataDocument = makeDocument(INDEX_TYPE_NAME, indexMetadata); - boolean success = false; - try { - final String indexUUID = indexMetadata.getIndexUUID(); - assert indexUUID.equals(IndexMetadata.INDEX_UUID_NA_VALUE) == false; - indexMetadataDocument.getDocument().add(new StringField(INDEX_UUID_FIELD_NAME, indexUUID, Field.Store.NO)); - success = true; - return indexMetadataDocument; - } finally { - if (success == false) { - IOUtils.closeWhileHandlingException(indexMetadataDocument); - } - } + private Document makeIndexMetadataDocument(IndexMetadata indexMetadata, DocumentBuffer documentBuffer) throws IOException { + final Document indexMetadataDocument = makeDocument(INDEX_TYPE_NAME, indexMetadata, documentBuffer); + final String indexUUID = indexMetadata.getIndexUUID(); + assert indexUUID.equals(IndexMetadata.INDEX_UUID_NA_VALUE) == false; + indexMetadataDocument.add(new StringField(INDEX_UUID_FIELD_NAME, indexUUID, Field.Store.NO)); + return indexMetadataDocument; } - private ReleasableDocument makeGlobalMetadataDocument(Metadata metadata) throws IOException { - return makeDocument(GLOBAL_TYPE_NAME, metadata); + private Document makeGlobalMetadataDocument(Metadata metadata, DocumentBuffer documentBuffer) throws IOException { + return makeDocument(GLOBAL_TYPE_NAME, metadata, documentBuffer); } - private ReleasableDocument makeDocument(String typeName, ToXContent metadata) throws IOException { + private Document makeDocument(String typeName, ToXContent metadata, DocumentBuffer documentBuffer) throws IOException { final Document document = new Document(); document.add(new StringField(TYPE_FIELD_NAME, typeName, Field.Store.NO)); - boolean success = false; - final ReleasableBytesStreamOutput releasableBytesStreamOutput = new ReleasableBytesStreamOutput(bigArrays); - try { - final FilterOutputStream outputStream = new FilterOutputStream(releasableBytesStreamOutput) { - - @Override - public void write(byte[] b, int off, int len) throws IOException { - out.write(b, off, len); - } - - @Override - public void close() { - // closing the XContentBuilder should not release the bytes yet - } - }; - try (XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.SMILE, outputStream)) { + try (RecyclingBytesStreamOutput streamOutput = documentBuffer.streamOutput()) { + try (XContentBuilder xContentBuilder = XContentFactory.contentBuilder(XContentType.SMILE, + Streams.flushOnCloseStream(streamOutput))) { xContentBuilder.startObject(); metadata.toXContent(xContentBuilder, FORMAT_PARAMS); xContentBuilder.endObject(); } - document.add(new StoredField(DATA_FIELD_NAME, releasableBytesStreamOutput.bytes().toBytesRef())); - final ReleasableDocument releasableDocument = new ReleasableDocument(document, releasableBytesStreamOutput); - success = true; - return releasableDocument; - } finally { - if (success == false) { - IOUtils.closeWhileHandlingException(releasableBytesStreamOutput); - } + document.add(new StoredField(DATA_FIELD_NAME, streamOutput.toBytesRef())); } + + return document; + } + } + + /** + * Holds the current buffer, keeping track of new allocations as it grows. + */ + private static class DocumentBuffer implements Releasable { + private final BigArrays bigArrays; + + @Nullable // if the initial page doesn't need releasing + private final Releasable releasable; + private byte[] buffer; + private int maxUsed; + + DocumentBuffer(int size, BigArrays bigArrays) { + if (size <= PageCacheRecycler.PAGE_SIZE_IN_BYTES) { + final ByteArray byteArray = bigArrays.newByteArray(PageCacheRecycler.PAGE_SIZE_IN_BYTES); + final BytesRefIterator iterator = new PagedBytesReference(byteArray, Math.toIntExact(byteArray.size())).iterator(); + final BytesRef firstPage; + try { + firstPage = iterator.next(); + assert iterator.next() == null : "should be one page"; + } catch (IOException e) { + throw new AssertionError("impossible", e); + } + + // we require that we have the whole page to ourselves + assert firstPage.offset == 0 : firstPage.offset; + assert firstPage.bytes.length == PageCacheRecycler.PAGE_SIZE_IN_BYTES : firstPage.bytes.length; + buffer = firstPage.bytes; + releasable = byteArray; + } else { + buffer = new byte[size]; + releasable = null; + } + this.bigArrays = bigArrays; + maxUsed = 0; + } + + RecyclingBytesStreamOutput streamOutput() { + return new RecyclingBytesStreamOutput(buffer, bigArrays) { + @Override + public BytesRef toBytesRef() { + final BytesRef bytesRef = super.toBytesRef(); + maxUsed = Math.max(maxUsed, bytesRef.length); + if (buffer != bytesRef.bytes) { + assert bytesRef.length > buffer.length; + logger.trace("growing document buffer from [{}] to [{}]", buffer.length, maxUsed); + buffer = bytesRef.bytes; + } + assert maxUsed <= buffer.length; + return bytesRef; + } + }; + } + + int getMaxUsed() { + return maxUsed; + } + + @Override + public void close() { + Releasables.close(releasable); } } } + diff --git a/server/src/test/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutputTests.java b/server/src/test/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutputTests.java new file mode 100644 index 00000000000..cbc00ede9c3 --- /dev/null +++ b/server/src/test/java/org/elasticsearch/common/bytes/RecyclingBytesStreamOutputTests.java @@ -0,0 +1,80 @@ +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.elasticsearch.common.bytes; + +import org.apache.lucene.util.BytesRef; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; +import org.elasticsearch.test.ESTestCase; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.sameInstance; + +public class RecyclingBytesStreamOutputTests extends ESTestCase { + + public void testReturnsWrittenBytesAndRecyclesBufferIfPossible() throws IOException { + + final byte[] source = randomUnicodeOfLength(scaledRandomIntBetween(0, 20000)).getBytes(StandardCharsets.UTF_8); + final byte[] buffer = new byte[scaledRandomIntBetween(0, 20000)]; + + final MockBigArrays bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + try (RecyclingBytesStreamOutput output = new RecyclingBytesStreamOutput(buffer, bigArrays)) { + int position = 0; + while (position < source.length) { + if (randomBoolean()) { + output.writeByte(source[position++]); + } else { + final int length = randomIntBetween(1, source.length - position); + final int sliceStart = randomIntBetween(0, position); + final int sliceEnd = randomIntBetween(position + length, source.length); + final byte[] slice = new byte[sliceEnd - sliceStart]; + System.arraycopy(source, sliceStart, slice, 0, slice.length); + output.writeBytes(slice, position - sliceStart, length); + position += length; + } + } + + final BytesRef bytesRef; + + if (randomBoolean()) { + bytesRef = output.toBytesRef(); + assertThat(bytesRef.offset, equalTo(0)); + + if (source.length <= buffer.length) { + assertThat("should have re-used the same buffer", bytesRef.bytes, sameInstance(buffer)); + } else { + assertThat("new buffer should be the right size", bytesRef.bytes.length, equalTo(source.length)); + } + } else { + bytesRef = output.bytes().toBytesRef(); + } + + assertThat(bytesRef.length, equalTo(source.length)); + final byte[] trimmed = new byte[source.length]; + System.arraycopy(bytesRef.bytes, bytesRef.offset, trimmed, 0, bytesRef.length); + assertArrayEquals(source, trimmed); + } + } +} diff --git a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java index 0507962447b..79931dc8f1b 100644 --- a/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/GatewayMetaStatePersistedStateTests.java @@ -38,11 +38,14 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.set.Sets; import org.elasticsearch.core.internal.io.IOUtils; import org.elasticsearch.env.Environment; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.env.TestEnvironment; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.node.Node; import org.elasticsearch.test.ESTestCase; import org.elasticsearch.threadpool.TestThreadPool; @@ -71,9 +74,11 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { private ClusterName clusterName; private Settings settings; private DiscoveryNode localNode; + private BigArrays bigArrays; @Override public void setUp() throws Exception { + bigArrays = new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); nodeEnvironment = newNodeEnvironment(); localNode = new DiscoveryNode("node1", buildNewFakeTransportAddress(), Collections.emptyMap(), Sets.newHashSet(DiscoveryNodeRole.MASTER_ROLE), Version.CURRENT); @@ -89,7 +94,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { } private CoordinationState.PersistedState newGatewayPersistedState() { - final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode); + final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode, bigArrays); gateway.start(settings, nodeEnvironment, xContentRegistry()); final CoordinationState.PersistedState persistedState = gateway.getPersistedState(); assertThat(persistedState, instanceOf(GatewayMetaState.LucenePersistedState.class)); @@ -298,7 +303,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { public void testStatePersistedOnLoad() throws IOException { // open LucenePersistedState to make sure that cluster state is written out to each data path final PersistedClusterStateService persistedClusterStateService = - new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(), new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L); final ClusterState state = createClusterState(randomNonNegativeLong(), Metadata.builder().clusterUUID(randomAlphaOfLength(10)).build()); @@ -316,7 +321,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { .put(Environment.PATH_DATA_SETTING.getKey(), path.getParent().getParent().toString()).build(); try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) { final PersistedClusterStateService newPersistedClusterStateService = - new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(), new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L); final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState(); assertFalse(onDiskState.empty()); @@ -340,7 +345,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { .put(nonMasterNode()) .put(Node.NODE_NAME_SETTING.getKey(), "test") .build(); - final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode); + final MockGatewayMetaState gateway = new MockGatewayMetaState(localNode, bigArrays); cleanup.add(gateway); final TransportService transportService = mock(TransportService.class); TestThreadPool threadPool = new TestThreadPool("testMarkAcceptedConfigAsCommittedOnDataOnlyNode"); @@ -350,7 +355,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { when(clusterService.getClusterSettings()).thenReturn( new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); final PersistedClusterStateService persistedClusterStateService = - new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(), new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L); gateway.start(settings, transportService, clusterService, new MetaStateService(nodeEnvironment, xContentRegistry()), null, null, persistedClusterStateService); @@ -437,7 +442,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { final AtomicReference ioExceptionRate = new AtomicReference<>(0.01d); final List list = new ArrayList<>(); final PersistedClusterStateService persistedClusterStateService = - new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(), new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) { @Override Directory createDirectory(Path path) { @@ -514,7 +519,7 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { .put(Environment.PATH_DATA_SETTING.getKey(), path.getParent().getParent().toString()).build(); try (NodeEnvironment nodeEnvironment = new NodeEnvironment(settings, TestEnvironment.newEnvironment(settings))) { final PersistedClusterStateService newPersistedClusterStateService = - new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(), new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L); final PersistedClusterStateService.OnDiskState onDiskState = newPersistedClusterStateService.loadBestOnDiskState(); assertFalse(onDiskState.empty()); @@ -527,4 +532,10 @@ public class GatewayMetaStatePersistedStateTests extends ESTestCase { } } + private static BigArrays getBigArrays() { + return usually() + ? BigArrays.NON_RECYCLING_INSTANCE + : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + } + } diff --git a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java index 30924f4b902..7d7b08b1079 100644 --- a/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java +++ b/server/src/test/java/org/elasticsearch/gateway/PersistedClusterStateServiceTests.java @@ -77,10 +77,7 @@ import static org.hamcrest.Matchers.nullValue; public class PersistedClusterStateServiceTests extends ESTestCase { private PersistedClusterStateService newPersistedClusterStateService(NodeEnvironment nodeEnvironment) { - return new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), - usually() - ? BigArrays.NON_RECYCLING_INSTANCE - : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), + return new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L); } @@ -358,7 +355,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { final PersistedClusterStateService persistedClusterStateService - = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) { @Override Directory createDirectory(Path path) throws IOException { @@ -396,7 +393,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { final PersistedClusterStateService persistedClusterStateService - = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) { @Override Directory createDirectory(Path path) throws IOException { @@ -442,7 +439,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase { try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { final PersistedClusterStateService persistedClusterStateService - = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + = new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), getBigArrays(), new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L) { @Override Directory createDirectory(Path path) throws IOException { @@ -800,12 +797,7 @@ public class PersistedClusterStateServiceTests extends ESTestCase { final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS); try (NodeEnvironment nodeEnvironment = newNodeEnvironment(createDataPaths())) { PersistedClusterStateService persistedClusterStateService = new PersistedClusterStateService(nodeEnvironment, - xContentRegistry(), - usually() - ? BigArrays.NON_RECYCLING_INSTANCE - : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()), - clusterSettings, - () -> currentTime.getAndAdd(writeDurationMillis.get())); + xContentRegistry(), getBigArrays(), clusterSettings, () -> currentTime.getAndAdd(writeDurationMillis.get())); try (Writer writer = persistedClusterStateService.createWriter()) { assertExpectedLogs(1L, null, clusterState, writer, new MockLogAppender.SeenEventExpectation( @@ -923,5 +915,10 @@ public class PersistedClusterStateServiceTests extends ESTestCase { return ClusterState.builder(ClusterName.DEFAULT).version(version).metadata(metadata).build(); } + private static BigArrays getBigArrays() { + return usually() + ? BigArrays.NON_RECYCLING_INSTANCE + : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); + } } diff --git a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java index 5cac78b3dd8..37ae36c23d1 100644 --- a/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/cluster/coordination/AbstractCoordinatorTestCase.java @@ -57,6 +57,8 @@ import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.TransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.common.util.BigArrays; +import org.elasticsearch.common.util.MockBigArrays; +import org.elasticsearch.common.util.MockPageCacheRecycler; import org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor; import org.elasticsearch.discovery.DiscoveryModule; import org.elasticsearch.discovery.SeedHostsProvider; @@ -65,6 +67,7 @@ import org.elasticsearch.gateway.ClusterStateUpdaters; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.gateway.MockGatewayMetaState; import org.elasticsearch.gateway.PersistedClusterStateService; +import org.elasticsearch.indices.breaker.NoneCircuitBreakerService; import org.elasticsearch.monitor.NodeHealthService; import org.elasticsearch.monitor.StatusInfo; import org.elasticsearch.test.ESTestCase; @@ -257,7 +260,8 @@ public class AbstractCoordinatorTestCase extends ESTestCase { private final Map committedStatesByVersion = new HashMap<>(); private final LinearizabilityChecker linearizabilityChecker = new LinearizabilityChecker(); private final History history = new History(); - private NodeHealthService nodeHealthService; + private final BigArrays bigArrays; + private final NodeHealthService nodeHealthService; private final Function defaultPersistedStateSupplier = MockPersistedState::new; @@ -274,6 +278,9 @@ public class AbstractCoordinatorTestCase extends ESTestCase { Cluster(int initialNodeCount, boolean allNodesMasterEligible, Settings nodeSettings, NodeHealthService nodeHealthService) { this.nodeHealthService = nodeHealthService; + bigArrays = usually() + ? BigArrays.NON_RECYCLING_INSTANCE + : new MockBigArrays(new MockPageCacheRecycler(Settings.EMPTY), new NoneCircuitBreakerService()); deterministicTaskQueue.setExecutionDelayVariabilityMillis(DEFAULT_DELAY_VARIABILITY); assertThat(initialNodeCount, greaterThan(0)); @@ -738,7 +745,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase { if (rarely()) { nodeEnvironment = newNodeEnvironment(); nodeEnvironments.add(nodeEnvironment); - final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(localNode); + final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(localNode, bigArrays); gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry()); delegate = gatewayMetaState.getPersistedState(); } else { @@ -762,7 +769,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase { final long updatedTerm = adaptCurrentTerm.apply(oldState.getCurrentTerm()); if (updatedMetadata != oldState.getLastAcceptedState().metadata() || updatedTerm != oldState.getCurrentTerm()) { try (PersistedClusterStateService.Writer writer = - new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), BigArrays.NON_RECYCLING_INSTANCE, + new PersistedClusterStateService(nodeEnvironment, xContentRegistry(), bigArrays, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), deterministicTaskQueue::getCurrentTimeMillis) .createWriter()) { @@ -770,7 +777,7 @@ public class AbstractCoordinatorTestCase extends ESTestCase { ClusterState.builder(oldState.getLastAcceptedState()).metadata(updatedMetadata).build()); } } - final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(newLocalNode); + final MockGatewayMetaState gatewayMetaState = new MockGatewayMetaState(newLocalNode, bigArrays); gatewayMetaState.start(Settings.EMPTY, nodeEnvironment, xContentRegistry()); delegate = gatewayMetaState.getPersistedState(); } else { diff --git a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java index 89c4dd7b22f..7eb27ae20ec 100644 --- a/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java +++ b/test/framework/src/main/java/org/elasticsearch/gateway/MockGatewayMetaState.java @@ -48,9 +48,11 @@ import static org.mockito.Mockito.when; */ public class MockGatewayMetaState extends GatewayMetaState { private final DiscoveryNode localNode; + private final BigArrays bigArrays; - public MockGatewayMetaState(DiscoveryNode localNode) { + public MockGatewayMetaState(DiscoveryNode localNode, BigArrays bigArrays) { this.localNode = localNode; + this.bigArrays = bigArrays; } @Override @@ -79,7 +81,7 @@ public class MockGatewayMetaState extends GatewayMetaState { throw new AssertionError(e); } start(settings, transportService, clusterService, metaStateService, - null, null, new PersistedClusterStateService(nodeEnvironment, xContentRegistry, BigArrays.NON_RECYCLING_INSTANCE, + null, null, new PersistedClusterStateService(nodeEnvironment, xContentRegistry, bigArrays, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), () -> 0L)); } }