diff --git a/server/src/main/java/org/apache/lucene/queries/BinaryDocValuesRangeQuery.java b/server/src/main/java/org/apache/lucene/queries/BinaryDocValuesRangeQuery.java index bb5da686f52..82778d31f6c 100644 --- a/server/src/main/java/org/apache/lucene/queries/BinaryDocValuesRangeQuery.java +++ b/server/src/main/java/org/apache/lucene/queries/BinaryDocValuesRangeQuery.java @@ -43,8 +43,8 @@ import org.apache.lucene.search.ScoreMode; import org.apache.lucene.search.Scorer; import org.apache.lucene.search.TwoPhaseIterator; import org.apache.lucene.search.Weight; -import org.apache.lucene.store.ByteArrayDataInput; import org.apache.lucene.util.BytesRef; +import org.opensearch.common.io.stream.BytesStreamInput; import org.opensearch.index.mapper.RangeType; import java.io.IOException; @@ -91,7 +91,7 @@ public final class BinaryDocValuesRangeQuery extends Query { final TwoPhaseIterator iterator = new TwoPhaseIterator(values) { - ByteArrayDataInput in = new ByteArrayDataInput(); + BytesStreamInput in = new BytesStreamInput(); BytesRef otherFrom = new BytesRef(); BytesRef otherTo = new BytesRef(); diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchHelper.java b/server/src/main/java/org/opensearch/action/search/TransportSearchHelper.java index 875fb2019b8..76770245a3d 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchHelper.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchHelper.java @@ -32,10 +32,10 @@ package org.opensearch.action.search; -import org.apache.lucene.store.ByteArrayDataInput; import org.apache.lucene.store.RAMOutputStream; import org.opensearch.LegacyESVersion; import org.opensearch.Version; +import org.opensearch.common.io.stream.BytesStreamInput; import org.opensearch.common.util.concurrent.AtomicArray; import org.opensearch.search.SearchPhaseResult; import org.opensearch.search.SearchShardTarget; @@ -88,7 +88,7 @@ final class TransportSearchHelper { static ParsedScrollId parseScrollId(String scrollId) { try { byte[] bytes = Base64.getUrlDecoder().decode(scrollId); - ByteArrayDataInput in = new ByteArrayDataInput(bytes); + BytesStreamInput in = new BytesStreamInput(bytes); final boolean includeContextUUID; final String type; final String firstChunk = in.readString(); diff --git a/server/src/main/java/org/opensearch/common/io/stream/BytesStreamInput.java b/server/src/main/java/org/opensearch/common/io/stream/BytesStreamInput.java new file mode 100644 index 00000000000..e593f3c89b0 --- /dev/null +++ b/server/src/main/java/org/opensearch/common/io/stream/BytesStreamInput.java @@ -0,0 +1,115 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.io.stream; + +import org.apache.lucene.util.BytesRef; + +import java.io.EOFException; +import java.io.IOException; + +/** + * {@link StreamInput} version of Lucene's {@link org.apache.lucene.store.ByteArrayDataInput} + * This is used as a replacement of Lucene ByteArrayDataInput for abstracting byte order changes + * in Lucene's API + * + * Attribution given to apache lucene project under ALv2: + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public class BytesStreamInput extends StreamInput { + private byte[] bytes; + private int pos; + private int limit; + + public BytesStreamInput(byte[] bytes) { + reset(bytes); + } + + public BytesStreamInput(byte[] bytes, int offset, int len) { + reset(bytes, offset, len); + } + + public BytesStreamInput() { + reset(BytesRef.EMPTY_BYTES); + } + + public void reset(byte[] bytes) { + reset(bytes, 0, bytes.length); + } + + public int getPosition() { + return pos; + } + + public void setPosition(int pos) { + this.pos = pos; + } + + public void reset(byte[] bytes, int offset, int len) { + this.bytes = bytes; + pos = offset; + limit = offset + len; + } + + public boolean eof() { + return pos == limit; + } + + public void skipBytes(long count) { + pos += count; + } + + // NOTE: AIOOBE not EOF if you read too much + @Override + public byte readByte() { + return bytes[pos++]; + } + + // NOTE: AIOOBE not EOF if you read too much + @Override + public void readBytes(byte[] b, int offset, int len) { + System.arraycopy(bytes, pos, b, offset, len); + pos += len; + } + + @Override + public void close() {} + + @Override + public int available() { + return limit - pos; + } + + @Override + protected void ensureCanReadBytes(int length) throws EOFException { + int available = available(); + if (length > available) { + throw new EOFException("attempting to read " + length + " bytes but only " + available + " bytes are available"); + } + } + + @Override + public int read() throws IOException { + return bytes[pos++] & 0xFF; + } + +} diff --git a/server/src/main/java/org/opensearch/common/util/ByteUtils.java b/server/src/main/java/org/opensearch/common/util/ByteUtils.java index 65cf9b567c7..b0054bc1a06 100644 --- a/server/src/main/java/org/opensearch/common/util/ByteUtils.java +++ b/server/src/main/java/org/opensearch/common/util/ByteUtils.java @@ -32,15 +32,11 @@ package org.opensearch.common.util; -import org.apache.lucene.store.ByteArrayDataInput; -import org.apache.lucene.store.ByteArrayDataOutput; - /** Utility methods to do byte-level encoding. These methods are biased towards little-endian byte order because it is the most * common byte order and reading several bytes at once may be optimizable in the future with the help of sun.mist.Unsafe. */ -public enum ByteUtils { - ; +public final class ByteUtils { - public static final int MAX_BYTES_VLONG = 9; + private ByteUtils() {}; /** Zig-zag decode. */ public static long zigZagDecode(long n) { @@ -107,46 +103,4 @@ public enum ByteUtils { public static float readFloatLE(byte[] arr, int offset) { return Float.intBitsToFloat(readIntLE(arr, offset)); } - - /** Same as DataOutput#writeVLong but accepts negative values (written on 9 bytes). */ - public static void writeVLong(ByteArrayDataOutput out, long i) { - for (int k = 0; k < 8 && (i & ~0x7FL) != 0L; ++k) { - out.writeByte((byte) ((i & 0x7FL) | 0x80L)); - i >>>= 7; - } - out.writeByte((byte) i); - } - - /** Same as DataOutput#readVLong but can read negative values (read on 9 bytes). */ - public static long readVLong(ByteArrayDataInput in) { - // unwinded because of hotspot bugs, see Lucene's impl - byte b = in.readByte(); - if (b >= 0) return b; - long i = b & 0x7FL; - b = in.readByte(); - i |= (b & 0x7FL) << 7; - if (b >= 0) return i; - b = in.readByte(); - i |= (b & 0x7FL) << 14; - if (b >= 0) return i; - b = in.readByte(); - i |= (b & 0x7FL) << 21; - if (b >= 0) return i; - b = in.readByte(); - i |= (b & 0x7FL) << 28; - if (b >= 0) return i; - b = in.readByte(); - i |= (b & 0x7FL) << 35; - if (b >= 0) return i; - b = in.readByte(); - i |= (b & 0x7FL) << 42; - if (b >= 0) return i; - b = in.readByte(); - i |= (b & 0x7FL) << 49; - if (b >= 0) return i; - b = in.readByte(); - i |= (b & 0xFFL) << 56; - return i; - } - } diff --git a/server/src/main/java/org/opensearch/index/fielddata/plain/AbstractBinaryDVLeafFieldData.java b/server/src/main/java/org/opensearch/index/fielddata/plain/AbstractBinaryDVLeafFieldData.java index bf641a91f37..787401386c1 100644 --- a/server/src/main/java/org/opensearch/index/fielddata/plain/AbstractBinaryDVLeafFieldData.java +++ b/server/src/main/java/org/opensearch/index/fielddata/plain/AbstractBinaryDVLeafFieldData.java @@ -33,9 +33,9 @@ package org.opensearch.index.fielddata.plain; import org.apache.lucene.index.BinaryDocValues; -import org.apache.lucene.store.ByteArrayDataInput; import org.apache.lucene.util.Accountable; import org.apache.lucene.util.BytesRef; +import org.opensearch.common.io.stream.BytesStreamInput; import org.opensearch.index.fielddata.LeafFieldData; import org.opensearch.index.fielddata.SortedBinaryDocValues; @@ -66,7 +66,7 @@ abstract class AbstractBinaryDVLeafFieldData implements LeafFieldData { return new SortedBinaryDocValues() { int count; - final ByteArrayDataInput in = new ByteArrayDataInput(); + final BytesStreamInput in = new BytesStreamInput(); final BytesRef scratch = new BytesRef(); @Override diff --git a/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java b/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java index d2a34064987..e55e339c3fd 100644 --- a/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java +++ b/server/src/main/java/org/opensearch/index/mapper/BinaryFieldMapper.java @@ -35,11 +35,11 @@ package org.opensearch.index.mapper; import com.carrotsearch.hppc.ObjectArrayList; import org.apache.lucene.document.StoredField; import org.apache.lucene.search.Query; -import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.util.BytesRef; import org.opensearch.OpenSearchException; import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.BytesReference; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.util.CollectionUtils; import org.opensearch.common.xcontent.XContentParser; import org.opensearch.index.fielddata.IndexFieldData; @@ -240,8 +240,7 @@ public class BinaryFieldMapper extends ParametrizedFieldMapper { try { CollectionUtils.sortAndDedup(bytesList); int size = bytesList.size(); - final byte[] bytes = new byte[totalSize + (size + 1) * 5]; - ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); + BytesStreamOutput out = new BytesStreamOutput(totalSize + (size + 1) * 5); out.writeVInt(size); // write total number of values for (int i = 0; i < size; i++) { final byte[] value = bytesList.get(i); @@ -249,7 +248,7 @@ public class BinaryFieldMapper extends ParametrizedFieldMapper { out.writeVInt(valueLength); out.writeBytes(value, 0, valueLength); } - return new BytesRef(bytes, 0, out.getPosition()); + return out.bytes().toBytesRef(); } catch (IOException e) { throw new OpenSearchException("Failed to get binary value", e); } diff --git a/server/src/main/java/org/opensearch/index/mapper/BinaryRangeUtil.java b/server/src/main/java/org/opensearch/index/mapper/BinaryRangeUtil.java index ad54840888a..96744ba2d01 100644 --- a/server/src/main/java/org/opensearch/index/mapper/BinaryRangeUtil.java +++ b/server/src/main/java/org/opensearch/index/mapper/BinaryRangeUtil.java @@ -33,11 +33,11 @@ package org.opensearch.index.mapper; import org.apache.lucene.document.InetAddressPoint; -import org.apache.lucene.store.ByteArrayDataInput; -import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.util.BytesRef; import org.apache.lucene.util.NumericUtils; import org.opensearch.common.TriFunction; +import org.opensearch.common.io.stream.BytesStreamInput; +import org.opensearch.common.io.stream.BytesStreamOutput; import java.io.IOException; import java.net.InetAddress; @@ -47,13 +47,12 @@ import java.util.Comparator; import java.util.List; import java.util.Set; -enum BinaryRangeUtil { +final class BinaryRangeUtil { - ; + private BinaryRangeUtil() {}; static BytesRef encodeIPRanges(Set ranges) throws IOException { - final byte[] encoded = new byte[5 + (16 * 2) * ranges.size()]; - ByteArrayDataOutput out = new ByteArrayDataOutput(encoded); + final BytesStreamOutput out = new BytesStreamOutput(5 + (16 * 2) * ranges.size()); out.writeVInt(ranges.size()); for (RangeFieldMapper.Range range : ranges) { InetAddress fromValue = (InetAddress) range.from; @@ -64,10 +63,10 @@ enum BinaryRangeUtil { byte[] encodedToValue = InetAddressPoint.encode(toValue); out.writeBytes(encodedToValue, 0, encodedToValue.length); } - return new BytesRef(encoded, 0, out.getPosition()); + return out.bytes().toBytesRef(); } - static List decodeIPRanges(BytesRef encodedRanges) { + static List decodeIPRanges(BytesRef encodedRanges) throws IOException { return decodeRanges(encodedRanges, RangeType.IP, BinaryRangeUtil::decodeIP); } @@ -83,8 +82,7 @@ enum BinaryRangeUtil { Comparator toComparator = Comparator.comparingLong(range -> ((Number) range.to).longValue()); sortedRanges.sort(fromComparator.thenComparing(toComparator)); - final byte[] encoded = new byte[5 + (9 * 2) * sortedRanges.size()]; - ByteArrayDataOutput out = new ByteArrayDataOutput(encoded); + final BytesStreamOutput out = new BytesStreamOutput(5 + (9 * 2) * sortedRanges.size()); out.writeVInt(sortedRanges.size()); for (RangeFieldMapper.Range range : sortedRanges) { byte[] encodedFrom = encodeLong(((Number) range.from).longValue()); @@ -92,10 +90,10 @@ enum BinaryRangeUtil { byte[] encodedTo = encodeLong(((Number) range.to).longValue()); out.writeBytes(encodedTo, encodedTo.length); } - return new BytesRef(encoded, 0, out.getPosition()); + return out.bytes().toBytesRef(); } - static List decodeLongRanges(BytesRef encodedRanges) { + static List decodeLongRanges(BytesRef encodedRanges) throws IOException { return decodeRanges(encodedRanges, RangeType.LONG, BinaryRangeUtil::decodeLong); } @@ -105,8 +103,7 @@ enum BinaryRangeUtil { Comparator toComparator = Comparator.comparingDouble(range -> ((Number) range.to).doubleValue()); sortedRanges.sort(fromComparator.thenComparing(toComparator)); - final byte[] encoded = new byte[5 + (8 * 2) * sortedRanges.size()]; - ByteArrayDataOutput out = new ByteArrayDataOutput(encoded); + final BytesStreamOutput out = new BytesStreamOutput(5 + (8 * 2) * sortedRanges.size()); out.writeVInt(sortedRanges.size()); for (RangeFieldMapper.Range range : sortedRanges) { byte[] encodedFrom = encodeDouble(((Number) range.from).doubleValue()); @@ -114,14 +111,14 @@ enum BinaryRangeUtil { byte[] encodedTo = encodeDouble(((Number) range.to).doubleValue()); out.writeBytes(encodedTo, encodedTo.length); } - return new BytesRef(encoded, 0, out.getPosition()); + return out.bytes().toBytesRef(); } - static List decodeDoubleRanges(BytesRef encodedRanges) { + static List decodeDoubleRanges(BytesRef encodedRanges) throws IOException { return decodeRanges(encodedRanges, RangeType.DOUBLE, BinaryRangeUtil::decodeDouble); } - static List decodeFloatRanges(BytesRef encodedRanges) { + static List decodeFloatRanges(BytesRef encodedRanges) throws IOException { return decodeRanges(encodedRanges, RangeType.FLOAT, BinaryRangeUtil::decodeFloat); } @@ -129,11 +126,10 @@ enum BinaryRangeUtil { BytesRef encodedRanges, RangeType rangeType, TriFunction decodeBytes - ) { + ) throws IOException { RangeType.LengthType lengthType = rangeType.lengthType; - ByteArrayDataInput in = new ByteArrayDataInput(); - in.reset(encodedRanges.bytes, encodedRanges.offset, encodedRanges.length); + BytesStreamInput in = new BytesStreamInput(encodedRanges.bytes, encodedRanges.offset, encodedRanges.length); int numRanges = in.readVInt(); List ranges = new ArrayList<>(numRanges); @@ -161,8 +157,7 @@ enum BinaryRangeUtil { Comparator toComparator = Comparator.comparingDouble(range -> ((Number) range.to).floatValue()); sortedRanges.sort(fromComparator.thenComparing(toComparator)); - final byte[] encoded = new byte[5 + (4 * 2) * sortedRanges.size()]; - ByteArrayDataOutput out = new ByteArrayDataOutput(encoded); + final BytesStreamOutput out = new BytesStreamOutput(5 + (4 * 2) * sortedRanges.size()); out.writeVInt(sortedRanges.size()); for (RangeFieldMapper.Range range : sortedRanges) { byte[] encodedFrom = encodeFloat(((Number) range.from).floatValue()); @@ -170,7 +165,7 @@ enum BinaryRangeUtil { byte[] encodedTo = encodeFloat(((Number) range.to).floatValue()); out.writeBytes(encodedTo, encodedTo.length); } - return new BytesRef(encoded, 0, out.getPosition()); + return out.bytes().toBytesRef(); } static byte[] encodeDouble(double number) { diff --git a/server/src/main/java/org/opensearch/index/mapper/RangeType.java b/server/src/main/java/org/opensearch/index/mapper/RangeType.java index c23aab9791c..9b0c374f8b5 100644 --- a/server/src/main/java/org/opensearch/index/mapper/RangeType.java +++ b/server/src/main/java/org/opensearch/index/mapper/RangeType.java @@ -273,7 +273,7 @@ public enum RangeType { } @Override - public List decodeRanges(BytesRef bytes) { + public List decodeRanges(BytesRef bytes) throws IOException { return LONG.decodeRanges(bytes); } @@ -375,7 +375,7 @@ public enum RangeType { } @Override - public List decodeRanges(BytesRef bytes) { + public List decodeRanges(BytesRef bytes) throws IOException { return BinaryRangeUtil.decodeFloatRanges(bytes); } @@ -486,7 +486,7 @@ public enum RangeType { } @Override - public List decodeRanges(BytesRef bytes) { + public List decodeRanges(BytesRef bytes) throws IOException { return BinaryRangeUtil.decodeDoubleRanges(bytes); } @@ -600,7 +600,7 @@ public enum RangeType { } @Override - public List decodeRanges(BytesRef bytes) { + public List decodeRanges(BytesRef bytes) throws IOException { return LONG.decodeRanges(bytes); } @@ -692,7 +692,7 @@ public enum RangeType { } @Override - public List decodeRanges(BytesRef bytes) { + public List decodeRanges(BytesRef bytes) throws IOException { return BinaryRangeUtil.decodeLongRanges(bytes); } @@ -946,7 +946,7 @@ public enum RangeType { // rounded up via parseFrom and parseTo methods. public abstract BytesRef encodeRanges(Set ranges) throws IOException; - public abstract List decodeRanges(BytesRef bytes); + public abstract List decodeRanges(BytesRef bytes) throws IOException; /** * Given the Range.to or Range.from Object value from a Range instance, converts that value into a Double. Before converting, it diff --git a/server/src/main/java/org/opensearch/index/store/Store.java b/server/src/main/java/org/opensearch/index/store/Store.java index d822a824b81..d44fd07ccb2 100644 --- a/server/src/main/java/org/opensearch/index/store/Store.java +++ b/server/src/main/java/org/opensearch/index/store/Store.java @@ -50,7 +50,6 @@ import org.apache.lucene.index.SegmentCommitInfo; import org.apache.lucene.index.SegmentInfos; import org.apache.lucene.store.AlreadyClosedException; import org.apache.lucene.store.BufferedChecksum; -import org.apache.lucene.store.ByteArrayDataInput; import org.apache.lucene.store.ChecksumIndexInput; import org.apache.lucene.store.Directory; import org.apache.lucene.store.FilterDirectory; @@ -67,6 +66,7 @@ import org.opensearch.ExceptionsHelper; import org.opensearch.common.UUIDs; import org.opensearch.common.bytes.BytesReference; import org.opensearch.common.io.Streams; +import org.opensearch.common.io.stream.BytesStreamInput; import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.io.stream.StreamInput; import org.opensearch.common.io.stream.StreamOutput; @@ -1426,11 +1426,11 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref throw new UnsupportedOperationException(); } - public long getStoredChecksum() { - return new ByteArrayDataInput(checksum).readLong(); + public long getStoredChecksum() throws IOException { + return new BytesStreamInput(checksum).readLong(); } - public long verify() throws CorruptIndexException { + public long verify() throws CorruptIndexException, IOException { long storedChecksum = getStoredChecksum(); if (getChecksum() == storedChecksum) { return storedChecksum; diff --git a/server/src/test/java/org/opensearch/common/io/stream/StreamTests.java b/server/src/test/java/org/opensearch/common/io/stream/BaseStreamTests.java similarity index 93% rename from server/src/test/java/org/opensearch/common/io/stream/StreamTests.java rename to server/src/test/java/org/opensearch/common/io/stream/BaseStreamTests.java index 662992bbec1..b92e59e43e0 100644 --- a/server/src/test/java/org/opensearch/common/io/stream/StreamTests.java +++ b/server/src/test/java/org/opensearch/common/io/stream/BaseStreamTests.java @@ -70,7 +70,9 @@ import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.iterableWithSize; import static org.hamcrest.Matchers.nullValue; -public class StreamTests extends OpenSearchTestCase { +public abstract class BaseStreamTests extends OpenSearchTestCase { + + protected abstract StreamInput getStreamInput(BytesReference bytesReference) throws IOException; public void testBooleanSerialization() throws IOException { final BytesStreamOutput output = new BytesStreamOutput(); @@ -85,7 +87,7 @@ public class StreamTests extends OpenSearchTestCase { assertThat(bytes[0], equalTo((byte) 0)); assertThat(bytes[1], equalTo((byte) 1)); - final StreamInput input = bytesReference.streamInput(); + final StreamInput input = getStreamInput(bytesReference); assertFalse(input.readBoolean()); assertTrue(input.readBoolean()); @@ -114,7 +116,7 @@ public class StreamTests extends OpenSearchTestCase { assertThat(bytes[1], equalTo((byte) 1)); assertThat(bytes[2], equalTo((byte) 2)); - final StreamInput input = bytesReference.streamInput(); + final StreamInput input = getStreamInput(bytesReference); final Boolean maybeFalse = input.readOptionalBoolean(); assertNotNull(maybeFalse); assertFalse(maybeFalse); @@ -139,7 +141,7 @@ public class StreamTests extends OpenSearchTestCase { long write = randomLong(); BytesStreamOutput out = new BytesStreamOutput(); out.writeZLong(write); - long read = out.bytes().streamInput().readZLong(); + long read = getStreamInput(out.bytes()).readZLong(); assertEquals(write, read); } } @@ -184,7 +186,7 @@ public class StreamTests extends OpenSearchTestCase { } BytesStreamOutput out = new BytesStreamOutput(); out.writeGenericValue(write); - LinkedHashMap read = (LinkedHashMap) out.bytes().streamInput().readGenericValue(); + LinkedHashMap read = (LinkedHashMap) getStreamInput(out.bytes()).readGenericValue(); assertEquals(size, read.size()); int index = 0; for (Map.Entry entry : read.entrySet()) { @@ -251,10 +253,10 @@ public class StreamTests extends OpenSearchTestCase { sourceArray = null; } out.writeOptionalArray(sourceArray); - targetArray = out.bytes().streamInput().readOptionalArray(WriteableString::new, WriteableString[]::new); + targetArray = getStreamInput(out.bytes()).readOptionalArray(WriteableString::new, WriteableString[]::new); } else { out.writeArray(sourceArray); - targetArray = out.bytes().streamInput().readArray(WriteableString::new, WriteableString[]::new); + targetArray = getStreamInput(out.bytes()).readArray(WriteableString::new, WriteableString[]::new); } assertThat(targetArray, equalTo(sourceArray)); @@ -273,11 +275,11 @@ public class StreamTests extends OpenSearchTestCase { strings = generateRandomStringArray(10, 10, false, true); } out.writeOptionalArray(writer, strings); - deserialized = out.bytes().streamInput().readOptionalArray(reader, String[]::new); + deserialized = getStreamInput(out.bytes()).readOptionalArray(reader, String[]::new); } else { strings = generateRandomStringArray(10, 10, false, true); out.writeArray(writer, strings); - deserialized = out.bytes().streamInput().readArray(reader, String[]::new); + deserialized = getStreamInput(out.bytes()).readArray(reader, String[]::new); } assertThat(deserialized, equalTo(strings)); } @@ -342,7 +344,7 @@ public class StreamTests extends OpenSearchTestCase { } try (BytesStreamOutput out = new BytesStreamOutput()) { writer.accept(out, collection); - try (StreamInput in = out.bytes().streamInput()) { + try (StreamInput in = getStreamInput(out.bytes())) { assertThat(collection, equalTo(reader.apply(in))); } } @@ -359,7 +361,7 @@ public class StreamTests extends OpenSearchTestCase { final BytesStreamOutput out = new BytesStreamOutput(); out.writeCollection(sourceSet, StreamOutput::writeLong); - final Set targetSet = out.bytes().streamInput().readSet(StreamInput::readLong); + final Set targetSet = getStreamInput(out.bytes()).readSet(StreamInput::readLong); assertThat(targetSet, equalTo(sourceSet)); } @@ -367,7 +369,7 @@ public class StreamTests extends OpenSearchTestCase { final Instant instant = Instant.now(); try (BytesStreamOutput out = new BytesStreamOutput()) { out.writeInstant(instant); - try (StreamInput in = out.bytes().streamInput()) { + try (StreamInput in = getStreamInput(out.bytes())) { final Instant serialized = in.readInstant(); assertEquals(instant, serialized); } @@ -378,7 +380,7 @@ public class StreamTests extends OpenSearchTestCase { final Instant instant = Instant.now(); try (BytesStreamOutput out = new BytesStreamOutput()) { out.writeOptionalInstant(instant); - try (StreamInput in = out.bytes().streamInput()) { + try (StreamInput in = getStreamInput(out.bytes())) { final Instant serialized = in.readOptionalInstant(); assertEquals(instant, serialized); } @@ -387,7 +389,7 @@ public class StreamTests extends OpenSearchTestCase { final Instant missing = null; try (BytesStreamOutput out = new BytesStreamOutput()) { out.writeOptionalInstant(missing); - try (StreamInput in = out.bytes().streamInput()) { + try (StreamInput in = getStreamInput(out.bytes())) { final Instant serialized = in.readOptionalInstant(); assertEquals(missing, serialized); } @@ -437,7 +439,8 @@ public class StreamTests extends OpenSearchTestCase { output.writeSecureString(secureString); final BytesReference bytesReference = output.bytes(); - final StreamInput input = bytesReference.streamInput(); + final StreamInput input = getStreamInput(bytesReference); + ; assertThat(secureString, is(equalTo(input.readSecureString()))); } @@ -447,7 +450,8 @@ public class StreamTests extends OpenSearchTestCase { output.writeOptionalSecureString(secureString); final BytesReference bytesReference = output.bytes(); - final StreamInput input = bytesReference.streamInput(); + final StreamInput input = getStreamInput(bytesReference); + ; if (secureString != null) { assertThat(input.readOptionalSecureString(), is(equalTo(secureString))); @@ -507,7 +511,8 @@ public class StreamTests extends OpenSearchTestCase { try (BytesStreamOutput output = new BytesStreamOutput()) { outputAssertions.accept(output); final BytesReference bytesReference = output.bytes(); - final StreamInput input = bytesReference.streamInput(); + final StreamInput input = getStreamInput(bytesReference); + ; inputAssertions.accept(input); } } diff --git a/server/src/test/java/org/opensearch/common/io/stream/ByteBufferStreamInputTests.java b/server/src/test/java/org/opensearch/common/io/stream/ByteBufferStreamInputTests.java new file mode 100644 index 00000000000..1061b335d71 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/io/stream/ByteBufferStreamInputTests.java @@ -0,0 +1,25 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.io.stream; + +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.bytes.BytesReference; + +import java.io.IOException; +import java.nio.ByteBuffer; + +/** test the ByteBufferStreamInput using the same BaseStreamTests */ +public class ByteBufferStreamInputTests extends BaseStreamTests { + @Override + protected StreamInput getStreamInput(BytesReference bytesReference) throws IOException { + BytesRef br = bytesReference.toBytesRef(); + ByteBuffer bb = ByteBuffer.wrap(br.bytes, br.offset, br.length); + return new ByteBufferStreamInput(bb); + } +} diff --git a/server/src/test/java/org/opensearch/common/io/stream/BytesReferenceStreamInputTests.java b/server/src/test/java/org/opensearch/common/io/stream/BytesReferenceStreamInputTests.java new file mode 100644 index 00000000000..ed77c3130a3 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/io/stream/BytesReferenceStreamInputTests.java @@ -0,0 +1,21 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.io.stream; + +import org.opensearch.common.bytes.BytesReference; + +import java.io.IOException; + +/** test the BytesReferenceStream using the same BaseStreamTests */ +public class BytesReferenceStreamInputTests extends BaseStreamTests { + @Override + protected StreamInput getStreamInput(BytesReference bytesReference) throws IOException { + return bytesReference.streamInput(); + } +} diff --git a/server/src/test/java/org/opensearch/common/io/stream/BytesStreamInputTests.java b/server/src/test/java/org/opensearch/common/io/stream/BytesStreamInputTests.java new file mode 100644 index 00000000000..c7a47e7580b --- /dev/null +++ b/server/src/test/java/org/opensearch/common/io/stream/BytesStreamInputTests.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.io.stream; + +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.bytes.BytesReference; + +import java.io.IOException; + +/** test the BytesStreamInput using the same BaseStreamTests */ +public class BytesStreamInputTests extends BaseStreamTests { + @Override + protected StreamInput getStreamInput(BytesReference bytesReference) throws IOException { + BytesRef br = bytesReference.toBytesRef(); + return new BytesStreamInput(br.bytes, br.offset, br.length); + } +} diff --git a/server/src/test/java/org/opensearch/common/io/stream/FilterStreamInputTests.java b/server/src/test/java/org/opensearch/common/io/stream/FilterStreamInputTests.java new file mode 100644 index 00000000000..3cf9dc656a8 --- /dev/null +++ b/server/src/test/java/org/opensearch/common/io/stream/FilterStreamInputTests.java @@ -0,0 +1,24 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.io.stream; + +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.bytes.BytesReference; + +import java.io.IOException; + +/** test the FilterStreamInput using the same BaseStreamTests */ +public class FilterStreamInputTests extends BaseStreamTests { + @Override + protected StreamInput getStreamInput(BytesReference bytesReference) throws IOException { + BytesRef br = bytesReference.toBytesRef(); + return new FilterStreamInput(StreamInput.wrap(br.bytes, br.offset, br.length)) { + }; + } +} diff --git a/server/src/test/java/org/opensearch/common/io/stream/InputStreamStreamInputTests.java b/server/src/test/java/org/opensearch/common/io/stream/InputStreamStreamInputTests.java new file mode 100644 index 00000000000..6a31c21445a --- /dev/null +++ b/server/src/test/java/org/opensearch/common/io/stream/InputStreamStreamInputTests.java @@ -0,0 +1,23 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.common.io.stream; + +import org.apache.lucene.util.BytesRef; +import org.opensearch.common.bytes.BytesReference; + +import java.io.IOException; + +/** test the InputStreamStreamInput using the same BaseStreamTests */ +public class InputStreamStreamInputTests extends BaseStreamTests { + @Override + protected StreamInput getStreamInput(BytesReference bytesReference) throws IOException { + BytesRef br = bytesReference.toBytesRef(); + return new InputStreamStreamInput(StreamInput.wrap(br.bytes, br.offset, br.length)); + } +} diff --git a/server/src/test/java/org/opensearch/common/util/ByteUtilsTests.java b/server/src/test/java/org/opensearch/common/util/ByteUtilsTests.java index 47fdfe5ef85..95c77e5d606 100644 --- a/server/src/test/java/org/opensearch/common/util/ByteUtilsTests.java +++ b/server/src/test/java/org/opensearch/common/util/ByteUtilsTests.java @@ -32,8 +32,6 @@ package org.opensearch.common.util; -import org.apache.lucene.store.ByteArrayDataInput; -import org.apache.lucene.store.ByteArrayDataOutput; import org.opensearch.test.OpenSearchTestCase; import java.io.IOException; @@ -81,42 +79,4 @@ public class ByteUtilsTests extends OpenSearchTestCase { } } - public void testVLong() throws IOException { - final long[] data = new long[scaledRandomIntBetween(1000, 10000)]; - for (int i = 0; i < data.length; ++i) { - switch (randomInt(4)) { - case 0: - data[i] = 0; - break; - case 1: - data[i] = Long.MAX_VALUE; - break; - case 2: - data[i] = Long.MIN_VALUE; - break; - case 3: - data[i] = randomInt(1 << randomIntBetween(2, 30)); - break; - case 4: - data[i] = randomLong(); - break; - default: - throw new AssertionError(); - } - } - final byte[] encoded = new byte[ByteUtils.MAX_BYTES_VLONG * data.length]; - ByteArrayDataOutput out = new ByteArrayDataOutput(encoded); - for (int i = 0; i < data.length; ++i) { - final int pos = out.getPosition(); - ByteUtils.writeVLong(out, data[i]); - if (data[i] < 0) { - assertEquals(ByteUtils.MAX_BYTES_VLONG, out.getPosition() - pos); - } - } - final ByteArrayDataInput in = new ByteArrayDataInput(encoded); - for (int i = 0; i < data.length; ++i) { - assertEquals(data[i], ByteUtils.readVLong(in)); - } - } - } diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogDeletionPolicyTests.java b/server/src/test/java/org/opensearch/index/translog/TranslogDeletionPolicyTests.java index 18060f493ad..e5a41fdc15d 100644 --- a/server/src/test/java/org/opensearch/index/translog/TranslogDeletionPolicyTests.java +++ b/server/src/test/java/org/opensearch/index/translog/TranslogDeletionPolicyTests.java @@ -32,11 +32,10 @@ package org.opensearch.index.translog; -import org.apache.lucene.store.ByteArrayDataOutput; import org.opensearch.common.UUIDs; -import org.opensearch.common.bytes.BytesArray; import org.opensearch.common.bytes.ReleasableBytesReference; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.common.lease.Releasable; import org.opensearch.common.util.BigArrays; import org.opensearch.core.internal.io.IOUtils; @@ -261,15 +260,14 @@ public class TranslogDeletionPolicyTests extends OpenSearchTestCase { ); writer = Mockito.spy(writer); Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime(); - byte[] bytes = new byte[4]; - ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); + BytesStreamOutput out = new BytesStreamOutput(4); final long startSeqNo = (gen - 1) * TOTAL_OPS_IN_GEN; final long endSeqNo = startSeqNo + TOTAL_OPS_IN_GEN - 1; for (long ops = endSeqNo; ops >= startSeqNo; ops--) { - out.reset(bytes); + out.reset(); out.writeInt((int) ops); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), ops); + writer.add(ReleasableBytesReference.wrap(out.bytes()), ops); } } return new Tuple<>(readers, writer); diff --git a/server/src/test/java/org/opensearch/index/translog/TranslogTests.java b/server/src/test/java/org/opensearch/index/translog/TranslogTests.java index e1d348e75d8..5614e07d710 100644 --- a/server/src/test/java/org/opensearch/index/translog/TranslogTests.java +++ b/server/src/test/java/org/opensearch/index/translog/TranslogTests.java @@ -43,7 +43,6 @@ import org.apache.lucene.index.Term; import org.apache.lucene.mockfile.FilterFileChannel; import org.apache.lucene.mockfile.FilterFileSystemProvider; import org.apache.lucene.store.AlreadyClosedException; -import org.apache.lucene.store.ByteArrayDataOutput; import org.apache.lucene.store.MockDirectoryWrapper; import org.apache.lucene.util.LineFileDocs; import org.apache.lucene.util.LuceneTestCase; @@ -1424,8 +1423,7 @@ public class TranslogTests extends OpenSearchTestCase { final Set seenSeqNos = new HashSet<>(); boolean opsHaveValidSequenceNumbers = randomBoolean(); for (int i = 0; i < numOps; i++) { - byte[] bytes = new byte[4]; - ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); + BytesStreamOutput out = new BytesStreamOutput(4); out.writeInt(i); long seqNo; do { @@ -1435,7 +1433,7 @@ public class TranslogTests extends OpenSearchTestCase { if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) { seenSeqNos.add(seqNo); } - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), seqNo); + writer.add(ReleasableBytesReference.wrap(out.bytes()), seqNo); } assertThat(persistedSeqNos, empty()); writer.sync(); @@ -1457,10 +1455,9 @@ public class TranslogTests extends OpenSearchTestCase { assertThat(reader.getCheckpoint().minSeqNo, equalTo(minSeqNo)); assertThat(reader.getCheckpoint().maxSeqNo, equalTo(maxSeqNo)); - byte[] bytes = new byte[4]; - ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); + BytesStreamOutput out = new BytesStreamOutput(4); out.writeInt(2048); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); + writer.add(ReleasableBytesReference.wrap(out.bytes()), randomNonNegativeLong()); if (reader instanceof TranslogReader) { ByteBuffer buffer = ByteBuffer.allocate(4); @@ -1666,10 +1663,9 @@ public class TranslogTests extends OpenSearchTestCase { ) { TranslogWriter writer = translog.getCurrent(); - byte[] bytes = new byte[4]; - ByteArrayDataOutput out = new ByteArrayDataOutput(new byte[4]); + BytesStreamOutput out = new BytesStreamOutput(4); out.writeInt(1); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1); + writer.add(ReleasableBytesReference.wrap(out.bytes()), 1); assertThat(persistedSeqNos, empty()); startBlocking.set(true); Thread thread = new Thread(() -> { @@ -1683,7 +1679,7 @@ public class TranslogTests extends OpenSearchTestCase { writeStarted.await(); // Add will not block even though we are currently writing/syncing - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2); + writer.add(ReleasableBytesReference.wrap(out.bytes()), 2); blocker.countDown(); // Sync against so that both operations are written @@ -1698,11 +1694,10 @@ public class TranslogTests extends OpenSearchTestCase { try (TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1)) { final int numOps = randomIntBetween(8, 128); for (int i = 0; i < numOps; i++) { - final byte[] bytes = new byte[4]; - final ByteArrayDataOutput out = new ByteArrayDataOutput(bytes); - out.reset(bytes); + final BytesStreamOutput out = new BytesStreamOutput(4); + out.reset(); out.writeInt(i); - writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong()); + writer.add(ReleasableBytesReference.wrap(out.bytes()), randomNonNegativeLong()); } writer.sync(); final Checkpoint writerCheckpoint = writer.getCheckpoint();