diff --git a/lucene/CHANGES.txt b/lucene/CHANGES.txt index 9901dc0087d..bf8afb3a15f 100644 --- a/lucene/CHANGES.txt +++ b/lucene/CHANGES.txt @@ -107,9 +107,14 @@ API Changes * GITHUB#11761: TieredMergePolicy now allowed a maximum allowable deletes percentage of down to 5%, and the default maximum allowable deletes percentage is changed from 33% to 20%. (Marc D'Mello) - + * GITHUB#11822: Configure replicator PrimaryNode replia shutdown timeout. (Steven Schlansker) +New Features +--------------------- +* GITHUB#11795: Add ByteWritesTrackingDirectoryWrapper to expose metrics for bytes merged, flushed, and overall + write amplification factor. (Marc D'Mello) + Improvements --------------------- * GITHUB#11778: Detailed part-of-speech information for particle(조사) and ending(어미) on Nori diff --git a/lucene/misc/src/java/org/apache/lucene/misc/store/ByteTrackingIndexOutput.java b/lucene/misc/src/java/org/apache/lucene/misc/store/ByteTrackingIndexOutput.java new file mode 100644 index 00000000000..4adb1938b4a --- /dev/null +++ b/lucene/misc/src/java/org/apache/lucene/misc/store/ByteTrackingIndexOutput.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.misc.store; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.lucene.store.IndexOutput; + +/** An {@link IndexOutput} that wraps another instance and tracks the number of bytes written */ +public class ByteTrackingIndexOutput extends IndexOutput { + + private final IndexOutput output; + private final AtomicLong byteTracker; + private boolean closed = false; + + protected ByteTrackingIndexOutput(IndexOutput output, AtomicLong byteTracker) { + super( + "Byte tracking wrapper for: " + output.getName(), + "ByteTrackingIndexOutput{" + output.getName() + "}"); + this.output = output; + this.byteTracker = byteTracker; + } + + @Override + public void writeByte(byte b) throws IOException { + output.writeByte(b); + } + + @Override + public void writeBytes(byte[] b, int offset, int length) throws IOException { + output.writeBytes(b, offset, length); + } + + @Override + public void writeShort(short i) throws IOException { + output.writeShort(i); + } + + @Override + public void writeInt(int i) throws IOException { + output.writeInt(i); + } + + @Override + public void writeLong(long i) throws IOException { + output.writeLong(i); + } + + @Override + public void close() throws IOException { + if (closed) { + output.close(); + return; + } + byteTracker.addAndGet(output.getFilePointer()); + closed = true; + output.close(); + } + + @Override + public long getFilePointer() { + return output.getFilePointer(); + } + + @Override + public long getChecksum() throws IOException { + return output.getChecksum(); + } + + public String getWrappedName() { + return output.getName(); + } + + public String getWrappedToString() { + return output.toString(); + } +} diff --git a/lucene/misc/src/java/org/apache/lucene/misc/store/ByteWritesTrackingDirectoryWrapper.java b/lucene/misc/src/java/org/apache/lucene/misc/store/ByteWritesTrackingDirectoryWrapper.java new file mode 100644 index 00000000000..6619bf8ddbe --- /dev/null +++ b/lucene/misc/src/java/org/apache/lucene/misc/store/ByteWritesTrackingDirectoryWrapper.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.misc.store; + +import java.io.IOException; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FilterDirectory; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; + +/** {@link FilterDirectory} that tracks write amplification factor */ +public final class ByteWritesTrackingDirectoryWrapper extends FilterDirectory { + + private final AtomicLong flushedBytes = new AtomicLong(); + private final AtomicLong mergedBytes = new AtomicLong(); + public final boolean trackTempOutput; + + public ByteWritesTrackingDirectoryWrapper(Directory in) { + this(in, false); + } + /** + * Constructor with option to track tempOutput + * + * @param in input Directory + * @param trackTempOutput if true, will also track temporary outputs created by this directory + */ + public ByteWritesTrackingDirectoryWrapper(Directory in, boolean trackTempOutput) { + super(in); + this.trackTempOutput = trackTempOutput; + } + + @Override + public IndexOutput createOutput(String name, IOContext ioContext) throws IOException { + IndexOutput output = in.createOutput(name, ioContext); + return createByteTrackingOutput(output, ioContext.context); + } + + @Override + public IndexOutput createTempOutput(String prefix, String suffix, IOContext ioContext) + throws IOException { + IndexOutput output = in.createTempOutput(prefix, suffix, ioContext); + return trackTempOutput ? createByteTrackingOutput(output, ioContext.context) : output; + } + + private IndexOutput createByteTrackingOutput(IndexOutput output, IOContext.Context context) { + switch (context) { + case FLUSH: + return new ByteTrackingIndexOutput(output, flushedBytes); + case MERGE: + return new ByteTrackingIndexOutput(output, mergedBytes); + case DEFAULT: + case READ: + default: + return output; + } + } + + public long getFlushedBytes() { + return flushedBytes.get(); + } + + public long getMergedBytes() { + return mergedBytes.get(); + } +} diff --git a/lucene/misc/src/test/org/apache/lucene/misc/store/TestByteWritesTrackingDirectoryWrapper.java b/lucene/misc/src/test/org/apache/lucene/misc/store/TestByteWritesTrackingDirectoryWrapper.java new file mode 100644 index 00000000000..74d9e6e1b57 --- /dev/null +++ b/lucene/misc/src/test/org/apache/lucene/misc/store/TestByteWritesTrackingDirectoryWrapper.java @@ -0,0 +1,111 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.lucene.misc.store; + +import java.io.IOException; +import java.nio.file.Path; +import org.apache.lucene.store.ByteBuffersDirectory; +import org.apache.lucene.store.Directory; +import org.apache.lucene.store.FlushInfo; +import org.apache.lucene.store.IOContext; +import org.apache.lucene.store.IndexOutput; +import org.apache.lucene.store.MergeInfo; +import org.apache.lucene.tests.store.BaseDirectoryTestCase; + +public class TestByteWritesTrackingDirectoryWrapper extends BaseDirectoryTestCase { + + public void testEmptyDir() throws Exception { + ByteWritesTrackingDirectoryWrapper dir = + new ByteWritesTrackingDirectoryWrapper(new ByteBuffersDirectory()); + assertEquals(0.0, dir.getFlushedBytes(), 0.0); + assertEquals(0.0, dir.getMergedBytes(), 0.0); + } + + public void testRandomOutput() throws Exception { + ByteWritesTrackingDirectoryWrapper dir = + new ByteWritesTrackingDirectoryWrapper(new ByteBuffersDirectory()); + + int expectedFlushBytes = random().nextInt(100); + int expectedMergeBytes = random().nextInt(100); + + IndexOutput output = + dir.createOutput("write", new IOContext(new FlushInfo(10, expectedFlushBytes))); + byte[] flushBytesArr = new byte[expectedFlushBytes]; + for (int i = 0; i < expectedFlushBytes; i++) { + flushBytesArr[i] = (byte) random().nextInt(127); + } + output.writeBytes(flushBytesArr, flushBytesArr.length); + assertEquals(0.0, dir.getFlushedBytes(), 0.0); + assertEquals(0.0, dir.getMergedBytes(), 0.0); + output.close(); + + // now merge bytes + output = + dir.createOutput("merge", new IOContext(new MergeInfo(10, expectedMergeBytes, false, 2))); + byte[] mergeBytesArr = new byte[expectedMergeBytes]; + for (int i = 0; i < expectedMergeBytes; i++) { + mergeBytesArr[i] = (byte) random().nextInt(127); + } + output.writeBytes(mergeBytesArr, mergeBytesArr.length); + assertEquals(expectedFlushBytes, dir.getFlushedBytes(), 0.0); + assertEquals(0.0, dir.getMergedBytes(), 0.0); + output.close(); + + assertEquals(expectedFlushBytes, dir.getFlushedBytes(), 0.0); + assertEquals(expectedMergeBytes, dir.getMergedBytes(), 0.0); + } + + public void testRandomTempOutput() throws Exception { + ByteWritesTrackingDirectoryWrapper dir = + new ByteWritesTrackingDirectoryWrapper(new ByteBuffersDirectory(), true); + + int expectedFlushBytes = random().nextInt(100); + int expectedMergeBytes = random().nextInt(100); + + IndexOutput output = + dir.createTempOutput("temp", "write", new IOContext(new FlushInfo(10, expectedFlushBytes))); + byte[] flushBytesArr = new byte[expectedFlushBytes]; + for (int i = 0; i < expectedFlushBytes; i++) { + flushBytesArr[i] = (byte) random().nextInt(127); + } + output.writeBytes(flushBytesArr, flushBytesArr.length); + assertEquals(0.0, dir.getFlushedBytes(), 0.0); + assertEquals(0.0, dir.getMergedBytes(), 0.0); + output.close(); + + // now merge bytes + output = + dir.createTempOutput( + "temp", "merge", new IOContext(new MergeInfo(10, expectedMergeBytes, false, 2))); + byte[] mergeBytesArr = new byte[expectedMergeBytes]; + for (int i = 0; i < expectedMergeBytes; i++) { + mergeBytesArr[i] = (byte) random().nextInt(127); + } + output.writeBytes(mergeBytesArr, mergeBytesArr.length); + assertEquals(expectedFlushBytes, dir.getFlushedBytes(), 0.0); + assertEquals(0.0, dir.getMergedBytes(), 0.0); + output.close(); + + assertEquals(expectedFlushBytes, dir.getFlushedBytes(), 0.0); + assertEquals(expectedMergeBytes, dir.getMergedBytes(), 0.0); + } + + @Override + protected Directory getDirectory(Path path) throws IOException { + return new ByteWritesTrackingDirectoryWrapper(new ByteBuffersDirectory()); + } +}