[Refactor] Lucene DataInput and DataOutput to StreamInput and StreamOutput (#2035)

Lucene 9 changes from BigEndian to LittleEndian in DataInput and DataOutput.
The use of Lucene's Data IO classes are refactored in this commit to use
StreamInput which maintains the same method calls while preserving the byte
order of the data.

Signed-off-by: Nicholas Walter Knize <nknize@apache.org>
This commit is contained in:
Nick Knize 2022-02-04 14:23:18 -06:00 committed by GitHub
parent fc0d3a368d
commit 25955782b9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
18 changed files with 306 additions and 169 deletions

View File

@ -43,8 +43,8 @@ import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.TwoPhaseIterator;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.io.stream.BytesStreamInput;
import org.opensearch.index.mapper.RangeType;
import java.io.IOException;
@ -91,7 +91,7 @@ public final class BinaryDocValuesRangeQuery extends Query {
final TwoPhaseIterator iterator = new TwoPhaseIterator(values) {
ByteArrayDataInput in = new ByteArrayDataInput();
BytesStreamInput in = new BytesStreamInput();
BytesRef otherFrom = new BytesRef();
BytesRef otherTo = new BytesRef();

View File

@ -32,10 +32,10 @@
package org.opensearch.action.search;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.RAMOutputStream;
import org.opensearch.LegacyESVersion;
import org.opensearch.Version;
import org.opensearch.common.io.stream.BytesStreamInput;
import org.opensearch.common.util.concurrent.AtomicArray;
import org.opensearch.search.SearchPhaseResult;
import org.opensearch.search.SearchShardTarget;
@ -88,7 +88,7 @@ final class TransportSearchHelper {
static ParsedScrollId parseScrollId(String scrollId) {
try {
byte[] bytes = Base64.getUrlDecoder().decode(scrollId);
ByteArrayDataInput in = new ByteArrayDataInput(bytes);
BytesStreamInput in = new BytesStreamInput(bytes);
final boolean includeContextUUID;
final String type;
final String firstChunk = in.readString();

View File

@ -0,0 +1,115 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.common.io.stream;
import org.apache.lucene.util.BytesRef;
import java.io.EOFException;
import java.io.IOException;
/**
* {@link StreamInput} version of Lucene's {@link org.apache.lucene.store.ByteArrayDataInput}
* This is used as a replacement of Lucene ByteArrayDataInput for abstracting byte order changes
* in Lucene's API
*
* Attribution given to apache lucene project under ALv2:
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
public class BytesStreamInput extends StreamInput {
private byte[] bytes;
private int pos;
private int limit;
public BytesStreamInput(byte[] bytes) {
reset(bytes);
}
public BytesStreamInput(byte[] bytes, int offset, int len) {
reset(bytes, offset, len);
}
public BytesStreamInput() {
reset(BytesRef.EMPTY_BYTES);
}
public void reset(byte[] bytes) {
reset(bytes, 0, bytes.length);
}
public int getPosition() {
return pos;
}
public void setPosition(int pos) {
this.pos = pos;
}
public void reset(byte[] bytes, int offset, int len) {
this.bytes = bytes;
pos = offset;
limit = offset + len;
}
public boolean eof() {
return pos == limit;
}
public void skipBytes(long count) {
pos += count;
}
// NOTE: AIOOBE not EOF if you read too much
@Override
public byte readByte() {
return bytes[pos++];
}
// NOTE: AIOOBE not EOF if you read too much
@Override
public void readBytes(byte[] b, int offset, int len) {
System.arraycopy(bytes, pos, b, offset, len);
pos += len;
}
@Override
public void close() {}
@Override
public int available() {
return limit - pos;
}
@Override
protected void ensureCanReadBytes(int length) throws EOFException {
int available = available();
if (length > available) {
throw new EOFException("attempting to read " + length + " bytes but only " + available + " bytes are available");
}
}
@Override
public int read() throws IOException {
return bytes[pos++] & 0xFF;
}
}

View File

@ -32,15 +32,11 @@
package org.opensearch.common.util;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteArrayDataOutput;
/** Utility methods to do byte-level encoding. These methods are biased towards little-endian byte order because it is the most
* common byte order and reading several bytes at once may be optimizable in the future with the help of sun.mist.Unsafe. */
public enum ByteUtils {
;
public final class ByteUtils {
public static final int MAX_BYTES_VLONG = 9;
private ByteUtils() {};
/** Zig-zag decode. */
public static long zigZagDecode(long n) {
@ -107,46 +103,4 @@ public enum ByteUtils {
public static float readFloatLE(byte[] arr, int offset) {
return Float.intBitsToFloat(readIntLE(arr, offset));
}
/** Same as DataOutput#writeVLong but accepts negative values (written on 9 bytes). */
public static void writeVLong(ByteArrayDataOutput out, long i) {
for (int k = 0; k < 8 && (i & ~0x7FL) != 0L; ++k) {
out.writeByte((byte) ((i & 0x7FL) | 0x80L));
i >>>= 7;
}
out.writeByte((byte) i);
}
/** Same as DataOutput#readVLong but can read negative values (read on 9 bytes). */
public static long readVLong(ByteArrayDataInput in) {
// unwinded because of hotspot bugs, see Lucene's impl
byte b = in.readByte();
if (b >= 0) return b;
long i = b & 0x7FL;
b = in.readByte();
i |= (b & 0x7FL) << 7;
if (b >= 0) return i;
b = in.readByte();
i |= (b & 0x7FL) << 14;
if (b >= 0) return i;
b = in.readByte();
i |= (b & 0x7FL) << 21;
if (b >= 0) return i;
b = in.readByte();
i |= (b & 0x7FL) << 28;
if (b >= 0) return i;
b = in.readByte();
i |= (b & 0x7FL) << 35;
if (b >= 0) return i;
b = in.readByte();
i |= (b & 0x7FL) << 42;
if (b >= 0) return i;
b = in.readByte();
i |= (b & 0x7FL) << 49;
if (b >= 0) return i;
b = in.readByte();
i |= (b & 0xFFL) << 56;
return i;
}
}

View File

@ -33,9 +33,9 @@
package org.opensearch.index.fielddata.plain;
import org.apache.lucene.index.BinaryDocValues;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.util.Accountable;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.io.stream.BytesStreamInput;
import org.opensearch.index.fielddata.LeafFieldData;
import org.opensearch.index.fielddata.SortedBinaryDocValues;
@ -66,7 +66,7 @@ abstract class AbstractBinaryDVLeafFieldData implements LeafFieldData {
return new SortedBinaryDocValues() {
int count;
final ByteArrayDataInput in = new ByteArrayDataInput();
final BytesStreamInput in = new BytesStreamInput();
final BytesRef scratch = new BytesRef();
@Override

View File

@ -35,11 +35,11 @@ package org.opensearch.index.mapper;
import com.carrotsearch.hppc.ObjectArrayList;
import org.apache.lucene.document.StoredField;
import org.apache.lucene.search.Query;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.util.BytesRef;
import org.opensearch.OpenSearchException;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.util.CollectionUtils;
import org.opensearch.common.xcontent.XContentParser;
import org.opensearch.index.fielddata.IndexFieldData;
@ -240,8 +240,7 @@ public class BinaryFieldMapper extends ParametrizedFieldMapper {
try {
CollectionUtils.sortAndDedup(bytesList);
int size = bytesList.size();
final byte[] bytes = new byte[totalSize + (size + 1) * 5];
ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
BytesStreamOutput out = new BytesStreamOutput(totalSize + (size + 1) * 5);
out.writeVInt(size); // write total number of values
for (int i = 0; i < size; i++) {
final byte[] value = bytesList.get(i);
@ -249,7 +248,7 @@ public class BinaryFieldMapper extends ParametrizedFieldMapper {
out.writeVInt(valueLength);
out.writeBytes(value, 0, valueLength);
}
return new BytesRef(bytes, 0, out.getPosition());
return out.bytes().toBytesRef();
} catch (IOException e) {
throw new OpenSearchException("Failed to get binary value", e);
}

View File

@ -33,11 +33,11 @@
package org.opensearch.index.mapper;
import org.apache.lucene.document.InetAddressPoint;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.util.BytesRef;
import org.apache.lucene.util.NumericUtils;
import org.opensearch.common.TriFunction;
import org.opensearch.common.io.stream.BytesStreamInput;
import org.opensearch.common.io.stream.BytesStreamOutput;
import java.io.IOException;
import java.net.InetAddress;
@ -47,13 +47,12 @@ import java.util.Comparator;
import java.util.List;
import java.util.Set;
enum BinaryRangeUtil {
final class BinaryRangeUtil {
;
private BinaryRangeUtil() {};
static BytesRef encodeIPRanges(Set<RangeFieldMapper.Range> ranges) throws IOException {
final byte[] encoded = new byte[5 + (16 * 2) * ranges.size()];
ByteArrayDataOutput out = new ByteArrayDataOutput(encoded);
final BytesStreamOutput out = new BytesStreamOutput(5 + (16 * 2) * ranges.size());
out.writeVInt(ranges.size());
for (RangeFieldMapper.Range range : ranges) {
InetAddress fromValue = (InetAddress) range.from;
@ -64,10 +63,10 @@ enum BinaryRangeUtil {
byte[] encodedToValue = InetAddressPoint.encode(toValue);
out.writeBytes(encodedToValue, 0, encodedToValue.length);
}
return new BytesRef(encoded, 0, out.getPosition());
return out.bytes().toBytesRef();
}
static List<RangeFieldMapper.Range> decodeIPRanges(BytesRef encodedRanges) {
static List<RangeFieldMapper.Range> decodeIPRanges(BytesRef encodedRanges) throws IOException {
return decodeRanges(encodedRanges, RangeType.IP, BinaryRangeUtil::decodeIP);
}
@ -83,8 +82,7 @@ enum BinaryRangeUtil {
Comparator<RangeFieldMapper.Range> toComparator = Comparator.comparingLong(range -> ((Number) range.to).longValue());
sortedRanges.sort(fromComparator.thenComparing(toComparator));
final byte[] encoded = new byte[5 + (9 * 2) * sortedRanges.size()];
ByteArrayDataOutput out = new ByteArrayDataOutput(encoded);
final BytesStreamOutput out = new BytesStreamOutput(5 + (9 * 2) * sortedRanges.size());
out.writeVInt(sortedRanges.size());
for (RangeFieldMapper.Range range : sortedRanges) {
byte[] encodedFrom = encodeLong(((Number) range.from).longValue());
@ -92,10 +90,10 @@ enum BinaryRangeUtil {
byte[] encodedTo = encodeLong(((Number) range.to).longValue());
out.writeBytes(encodedTo, encodedTo.length);
}
return new BytesRef(encoded, 0, out.getPosition());
return out.bytes().toBytesRef();
}
static List<RangeFieldMapper.Range> decodeLongRanges(BytesRef encodedRanges) {
static List<RangeFieldMapper.Range> decodeLongRanges(BytesRef encodedRanges) throws IOException {
return decodeRanges(encodedRanges, RangeType.LONG, BinaryRangeUtil::decodeLong);
}
@ -105,8 +103,7 @@ enum BinaryRangeUtil {
Comparator<RangeFieldMapper.Range> toComparator = Comparator.comparingDouble(range -> ((Number) range.to).doubleValue());
sortedRanges.sort(fromComparator.thenComparing(toComparator));
final byte[] encoded = new byte[5 + (8 * 2) * sortedRanges.size()];
ByteArrayDataOutput out = new ByteArrayDataOutput(encoded);
final BytesStreamOutput out = new BytesStreamOutput(5 + (8 * 2) * sortedRanges.size());
out.writeVInt(sortedRanges.size());
for (RangeFieldMapper.Range range : sortedRanges) {
byte[] encodedFrom = encodeDouble(((Number) range.from).doubleValue());
@ -114,14 +111,14 @@ enum BinaryRangeUtil {
byte[] encodedTo = encodeDouble(((Number) range.to).doubleValue());
out.writeBytes(encodedTo, encodedTo.length);
}
return new BytesRef(encoded, 0, out.getPosition());
return out.bytes().toBytesRef();
}
static List<RangeFieldMapper.Range> decodeDoubleRanges(BytesRef encodedRanges) {
static List<RangeFieldMapper.Range> decodeDoubleRanges(BytesRef encodedRanges) throws IOException {
return decodeRanges(encodedRanges, RangeType.DOUBLE, BinaryRangeUtil::decodeDouble);
}
static List<RangeFieldMapper.Range> decodeFloatRanges(BytesRef encodedRanges) {
static List<RangeFieldMapper.Range> decodeFloatRanges(BytesRef encodedRanges) throws IOException {
return decodeRanges(encodedRanges, RangeType.FLOAT, BinaryRangeUtil::decodeFloat);
}
@ -129,11 +126,10 @@ enum BinaryRangeUtil {
BytesRef encodedRanges,
RangeType rangeType,
TriFunction<byte[], Integer, Integer, Object> decodeBytes
) {
) throws IOException {
RangeType.LengthType lengthType = rangeType.lengthType;
ByteArrayDataInput in = new ByteArrayDataInput();
in.reset(encodedRanges.bytes, encodedRanges.offset, encodedRanges.length);
BytesStreamInput in = new BytesStreamInput(encodedRanges.bytes, encodedRanges.offset, encodedRanges.length);
int numRanges = in.readVInt();
List<RangeFieldMapper.Range> ranges = new ArrayList<>(numRanges);
@ -161,8 +157,7 @@ enum BinaryRangeUtil {
Comparator<RangeFieldMapper.Range> toComparator = Comparator.comparingDouble(range -> ((Number) range.to).floatValue());
sortedRanges.sort(fromComparator.thenComparing(toComparator));
final byte[] encoded = new byte[5 + (4 * 2) * sortedRanges.size()];
ByteArrayDataOutput out = new ByteArrayDataOutput(encoded);
final BytesStreamOutput out = new BytesStreamOutput(5 + (4 * 2) * sortedRanges.size());
out.writeVInt(sortedRanges.size());
for (RangeFieldMapper.Range range : sortedRanges) {
byte[] encodedFrom = encodeFloat(((Number) range.from).floatValue());
@ -170,7 +165,7 @@ enum BinaryRangeUtil {
byte[] encodedTo = encodeFloat(((Number) range.to).floatValue());
out.writeBytes(encodedTo, encodedTo.length);
}
return new BytesRef(encoded, 0, out.getPosition());
return out.bytes().toBytesRef();
}
static byte[] encodeDouble(double number) {

View File

@ -273,7 +273,7 @@ public enum RangeType {
}
@Override
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) {
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) throws IOException {
return LONG.decodeRanges(bytes);
}
@ -375,7 +375,7 @@ public enum RangeType {
}
@Override
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) {
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) throws IOException {
return BinaryRangeUtil.decodeFloatRanges(bytes);
}
@ -486,7 +486,7 @@ public enum RangeType {
}
@Override
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) {
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) throws IOException {
return BinaryRangeUtil.decodeDoubleRanges(bytes);
}
@ -600,7 +600,7 @@ public enum RangeType {
}
@Override
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) {
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) throws IOException {
return LONG.decodeRanges(bytes);
}
@ -692,7 +692,7 @@ public enum RangeType {
}
@Override
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) {
public List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) throws IOException {
return BinaryRangeUtil.decodeLongRanges(bytes);
}
@ -946,7 +946,7 @@ public enum RangeType {
// rounded up via parseFrom and parseTo methods.
public abstract BytesRef encodeRanges(Set<RangeFieldMapper.Range> ranges) throws IOException;
public abstract List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes);
public abstract List<RangeFieldMapper.Range> decodeRanges(BytesRef bytes) throws IOException;
/**
* Given the Range.to or Range.from Object value from a Range instance, converts that value into a Double. Before converting, it

View File

@ -50,7 +50,6 @@ import org.apache.lucene.index.SegmentCommitInfo;
import org.apache.lucene.index.SegmentInfos;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksum;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ChecksumIndexInput;
import org.apache.lucene.store.Directory;
import org.apache.lucene.store.FilterDirectory;
@ -67,6 +66,7 @@ import org.opensearch.ExceptionsHelper;
import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesReference;
import org.opensearch.common.io.Streams;
import org.opensearch.common.io.stream.BytesStreamInput;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.io.stream.StreamInput;
import org.opensearch.common.io.stream.StreamOutput;
@ -1426,11 +1426,11 @@ public class Store extends AbstractIndexShardComponent implements Closeable, Ref
throw new UnsupportedOperationException();
}
public long getStoredChecksum() {
return new ByteArrayDataInput(checksum).readLong();
public long getStoredChecksum() throws IOException {
return new BytesStreamInput(checksum).readLong();
}
public long verify() throws CorruptIndexException {
public long verify() throws CorruptIndexException, IOException {
long storedChecksum = getStoredChecksum();
if (getChecksum() == storedChecksum) {
return storedChecksum;

View File

@ -70,7 +70,9 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.iterableWithSize;
import static org.hamcrest.Matchers.nullValue;
public class StreamTests extends OpenSearchTestCase {
public abstract class BaseStreamTests extends OpenSearchTestCase {
protected abstract StreamInput getStreamInput(BytesReference bytesReference) throws IOException;
public void testBooleanSerialization() throws IOException {
final BytesStreamOutput output = new BytesStreamOutput();
@ -85,7 +87,7 @@ public class StreamTests extends OpenSearchTestCase {
assertThat(bytes[0], equalTo((byte) 0));
assertThat(bytes[1], equalTo((byte) 1));
final StreamInput input = bytesReference.streamInput();
final StreamInput input = getStreamInput(bytesReference);
assertFalse(input.readBoolean());
assertTrue(input.readBoolean());
@ -114,7 +116,7 @@ public class StreamTests extends OpenSearchTestCase {
assertThat(bytes[1], equalTo((byte) 1));
assertThat(bytes[2], equalTo((byte) 2));
final StreamInput input = bytesReference.streamInput();
final StreamInput input = getStreamInput(bytesReference);
final Boolean maybeFalse = input.readOptionalBoolean();
assertNotNull(maybeFalse);
assertFalse(maybeFalse);
@ -139,7 +141,7 @@ public class StreamTests extends OpenSearchTestCase {
long write = randomLong();
BytesStreamOutput out = new BytesStreamOutput();
out.writeZLong(write);
long read = out.bytes().streamInput().readZLong();
long read = getStreamInput(out.bytes()).readZLong();
assertEquals(write, read);
}
}
@ -184,7 +186,7 @@ public class StreamTests extends OpenSearchTestCase {
}
BytesStreamOutput out = new BytesStreamOutput();
out.writeGenericValue(write);
LinkedHashMap<String, Integer> read = (LinkedHashMap<String, Integer>) out.bytes().streamInput().readGenericValue();
LinkedHashMap<String, Integer> read = (LinkedHashMap<String, Integer>) getStreamInput(out.bytes()).readGenericValue();
assertEquals(size, read.size());
int index = 0;
for (Map.Entry<String, Integer> entry : read.entrySet()) {
@ -251,10 +253,10 @@ public class StreamTests extends OpenSearchTestCase {
sourceArray = null;
}
out.writeOptionalArray(sourceArray);
targetArray = out.bytes().streamInput().readOptionalArray(WriteableString::new, WriteableString[]::new);
targetArray = getStreamInput(out.bytes()).readOptionalArray(WriteableString::new, WriteableString[]::new);
} else {
out.writeArray(sourceArray);
targetArray = out.bytes().streamInput().readArray(WriteableString::new, WriteableString[]::new);
targetArray = getStreamInput(out.bytes()).readArray(WriteableString::new, WriteableString[]::new);
}
assertThat(targetArray, equalTo(sourceArray));
@ -273,11 +275,11 @@ public class StreamTests extends OpenSearchTestCase {
strings = generateRandomStringArray(10, 10, false, true);
}
out.writeOptionalArray(writer, strings);
deserialized = out.bytes().streamInput().readOptionalArray(reader, String[]::new);
deserialized = getStreamInput(out.bytes()).readOptionalArray(reader, String[]::new);
} else {
strings = generateRandomStringArray(10, 10, false, true);
out.writeArray(writer, strings);
deserialized = out.bytes().streamInput().readArray(reader, String[]::new);
deserialized = getStreamInput(out.bytes()).readArray(reader, String[]::new);
}
assertThat(deserialized, equalTo(strings));
}
@ -342,7 +344,7 @@ public class StreamTests extends OpenSearchTestCase {
}
try (BytesStreamOutput out = new BytesStreamOutput()) {
writer.accept(out, collection);
try (StreamInput in = out.bytes().streamInput()) {
try (StreamInput in = getStreamInput(out.bytes())) {
assertThat(collection, equalTo(reader.apply(in)));
}
}
@ -359,7 +361,7 @@ public class StreamTests extends OpenSearchTestCase {
final BytesStreamOutput out = new BytesStreamOutput();
out.writeCollection(sourceSet, StreamOutput::writeLong);
final Set<Long> targetSet = out.bytes().streamInput().readSet(StreamInput::readLong);
final Set<Long> targetSet = getStreamInput(out.bytes()).readSet(StreamInput::readLong);
assertThat(targetSet, equalTo(sourceSet));
}
@ -367,7 +369,7 @@ public class StreamTests extends OpenSearchTestCase {
final Instant instant = Instant.now();
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeInstant(instant);
try (StreamInput in = out.bytes().streamInput()) {
try (StreamInput in = getStreamInput(out.bytes())) {
final Instant serialized = in.readInstant();
assertEquals(instant, serialized);
}
@ -378,7 +380,7 @@ public class StreamTests extends OpenSearchTestCase {
final Instant instant = Instant.now();
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeOptionalInstant(instant);
try (StreamInput in = out.bytes().streamInput()) {
try (StreamInput in = getStreamInput(out.bytes())) {
final Instant serialized = in.readOptionalInstant();
assertEquals(instant, serialized);
}
@ -387,7 +389,7 @@ public class StreamTests extends OpenSearchTestCase {
final Instant missing = null;
try (BytesStreamOutput out = new BytesStreamOutput()) {
out.writeOptionalInstant(missing);
try (StreamInput in = out.bytes().streamInput()) {
try (StreamInput in = getStreamInput(out.bytes())) {
final Instant serialized = in.readOptionalInstant();
assertEquals(missing, serialized);
}
@ -437,7 +439,8 @@ public class StreamTests extends OpenSearchTestCase {
output.writeSecureString(secureString);
final BytesReference bytesReference = output.bytes();
final StreamInput input = bytesReference.streamInput();
final StreamInput input = getStreamInput(bytesReference);
;
assertThat(secureString, is(equalTo(input.readSecureString())));
}
@ -447,7 +450,8 @@ public class StreamTests extends OpenSearchTestCase {
output.writeOptionalSecureString(secureString);
final BytesReference bytesReference = output.bytes();
final StreamInput input = bytesReference.streamInput();
final StreamInput input = getStreamInput(bytesReference);
;
if (secureString != null) {
assertThat(input.readOptionalSecureString(), is(equalTo(secureString)));
@ -507,7 +511,8 @@ public class StreamTests extends OpenSearchTestCase {
try (BytesStreamOutput output = new BytesStreamOutput()) {
outputAssertions.accept(output);
final BytesReference bytesReference = output.bytes();
final StreamInput input = bytesReference.streamInput();
final StreamInput input = getStreamInput(bytesReference);
;
inputAssertions.accept(input);
}
}

View File

@ -0,0 +1,25 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.common.io.stream;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.bytes.BytesReference;
import java.io.IOException;
import java.nio.ByteBuffer;
/** test the ByteBufferStreamInput using the same BaseStreamTests */
public class ByteBufferStreamInputTests extends BaseStreamTests {
@Override
protected StreamInput getStreamInput(BytesReference bytesReference) throws IOException {
BytesRef br = bytesReference.toBytesRef();
ByteBuffer bb = ByteBuffer.wrap(br.bytes, br.offset, br.length);
return new ByteBufferStreamInput(bb);
}
}

View File

@ -0,0 +1,21 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.common.io.stream;
import org.opensearch.common.bytes.BytesReference;
import java.io.IOException;
/** test the BytesReferenceStream using the same BaseStreamTests */
public class BytesReferenceStreamInputTests extends BaseStreamTests {
@Override
protected StreamInput getStreamInput(BytesReference bytesReference) throws IOException {
return bytesReference.streamInput();
}
}

View File

@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.common.io.stream;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.bytes.BytesReference;
import java.io.IOException;
/** test the BytesStreamInput using the same BaseStreamTests */
public class BytesStreamInputTests extends BaseStreamTests {
@Override
protected StreamInput getStreamInput(BytesReference bytesReference) throws IOException {
BytesRef br = bytesReference.toBytesRef();
return new BytesStreamInput(br.bytes, br.offset, br.length);
}
}

View File

@ -0,0 +1,24 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.common.io.stream;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.bytes.BytesReference;
import java.io.IOException;
/** test the FilterStreamInput using the same BaseStreamTests */
public class FilterStreamInputTests extends BaseStreamTests {
@Override
protected StreamInput getStreamInput(BytesReference bytesReference) throws IOException {
BytesRef br = bytesReference.toBytesRef();
return new FilterStreamInput(StreamInput.wrap(br.bytes, br.offset, br.length)) {
};
}
}

View File

@ -0,0 +1,23 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/
package org.opensearch.common.io.stream;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.bytes.BytesReference;
import java.io.IOException;
/** test the InputStreamStreamInput using the same BaseStreamTests */
public class InputStreamStreamInputTests extends BaseStreamTests {
@Override
protected StreamInput getStreamInput(BytesReference bytesReference) throws IOException {
BytesRef br = bytesReference.toBytesRef();
return new InputStreamStreamInput(StreamInput.wrap(br.bytes, br.offset, br.length));
}
}

View File

@ -32,8 +32,6 @@
package org.opensearch.common.util;
import org.apache.lucene.store.ByteArrayDataInput;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.opensearch.test.OpenSearchTestCase;
import java.io.IOException;
@ -81,42 +79,4 @@ public class ByteUtilsTests extends OpenSearchTestCase {
}
}
public void testVLong() throws IOException {
final long[] data = new long[scaledRandomIntBetween(1000, 10000)];
for (int i = 0; i < data.length; ++i) {
switch (randomInt(4)) {
case 0:
data[i] = 0;
break;
case 1:
data[i] = Long.MAX_VALUE;
break;
case 2:
data[i] = Long.MIN_VALUE;
break;
case 3:
data[i] = randomInt(1 << randomIntBetween(2, 30));
break;
case 4:
data[i] = randomLong();
break;
default:
throw new AssertionError();
}
}
final byte[] encoded = new byte[ByteUtils.MAX_BYTES_VLONG * data.length];
ByteArrayDataOutput out = new ByteArrayDataOutput(encoded);
for (int i = 0; i < data.length; ++i) {
final int pos = out.getPosition();
ByteUtils.writeVLong(out, data[i]);
if (data[i] < 0) {
assertEquals(ByteUtils.MAX_BYTES_VLONG, out.getPosition() - pos);
}
}
final ByteArrayDataInput in = new ByteArrayDataInput(encoded);
for (int i = 0; i < data.length; ++i) {
assertEquals(data[i], ByteUtils.readVLong(in));
}
}
}

View File

@ -32,11 +32,10 @@
package org.opensearch.index.translog;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.opensearch.common.UUIDs;
import org.opensearch.common.bytes.BytesArray;
import org.opensearch.common.bytes.ReleasableBytesReference;
import org.opensearch.common.collect.Tuple;
import org.opensearch.common.io.stream.BytesStreamOutput;
import org.opensearch.common.lease.Releasable;
import org.opensearch.common.util.BigArrays;
import org.opensearch.core.internal.io.IOUtils;
@ -261,15 +260,14 @@ public class TranslogDeletionPolicyTests extends OpenSearchTestCase {
);
writer = Mockito.spy(writer);
Mockito.doReturn(now - (numberOfReaders - gen + 1) * 1000).when(writer).getLastModifiedTime();
byte[] bytes = new byte[4];
ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
BytesStreamOutput out = new BytesStreamOutput(4);
final long startSeqNo = (gen - 1) * TOTAL_OPS_IN_GEN;
final long endSeqNo = startSeqNo + TOTAL_OPS_IN_GEN - 1;
for (long ops = endSeqNo; ops >= startSeqNo; ops--) {
out.reset(bytes);
out.reset();
out.writeInt((int) ops);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), ops);
writer.add(ReleasableBytesReference.wrap(out.bytes()), ops);
}
}
return new Tuple<>(readers, writer);

View File

@ -43,7 +43,6 @@ import org.apache.lucene.index.Term;
import org.apache.lucene.mockfile.FilterFileChannel;
import org.apache.lucene.mockfile.FilterFileSystemProvider;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.ByteArrayDataOutput;
import org.apache.lucene.store.MockDirectoryWrapper;
import org.apache.lucene.util.LineFileDocs;
import org.apache.lucene.util.LuceneTestCase;
@ -1424,8 +1423,7 @@ public class TranslogTests extends OpenSearchTestCase {
final Set<Long> seenSeqNos = new HashSet<>();
boolean opsHaveValidSequenceNumbers = randomBoolean();
for (int i = 0; i < numOps; i++) {
byte[] bytes = new byte[4];
ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
BytesStreamOutput out = new BytesStreamOutput(4);
out.writeInt(i);
long seqNo;
do {
@ -1435,7 +1433,7 @@ public class TranslogTests extends OpenSearchTestCase {
if (seqNo != SequenceNumbers.UNASSIGNED_SEQ_NO) {
seenSeqNos.add(seqNo);
}
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), seqNo);
writer.add(ReleasableBytesReference.wrap(out.bytes()), seqNo);
}
assertThat(persistedSeqNos, empty());
writer.sync();
@ -1457,10 +1455,9 @@ public class TranslogTests extends OpenSearchTestCase {
assertThat(reader.getCheckpoint().minSeqNo, equalTo(minSeqNo));
assertThat(reader.getCheckpoint().maxSeqNo, equalTo(maxSeqNo));
byte[] bytes = new byte[4];
ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
BytesStreamOutput out = new BytesStreamOutput(4);
out.writeInt(2048);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong());
writer.add(ReleasableBytesReference.wrap(out.bytes()), randomNonNegativeLong());
if (reader instanceof TranslogReader) {
ByteBuffer buffer = ByteBuffer.allocate(4);
@ -1666,10 +1663,9 @@ public class TranslogTests extends OpenSearchTestCase {
) {
TranslogWriter writer = translog.getCurrent();
byte[] bytes = new byte[4];
ByteArrayDataOutput out = new ByteArrayDataOutput(new byte[4]);
BytesStreamOutput out = new BytesStreamOutput(4);
out.writeInt(1);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 1);
writer.add(ReleasableBytesReference.wrap(out.bytes()), 1);
assertThat(persistedSeqNos, empty());
startBlocking.set(true);
Thread thread = new Thread(() -> {
@ -1683,7 +1679,7 @@ public class TranslogTests extends OpenSearchTestCase {
writeStarted.await();
// Add will not block even though we are currently writing/syncing
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), 2);
writer.add(ReleasableBytesReference.wrap(out.bytes()), 2);
blocker.countDown();
// Sync against so that both operations are written
@ -1698,11 +1694,10 @@ public class TranslogTests extends OpenSearchTestCase {
try (TranslogWriter writer = translog.createWriter(translog.currentFileGeneration() + 1)) {
final int numOps = randomIntBetween(8, 128);
for (int i = 0; i < numOps; i++) {
final byte[] bytes = new byte[4];
final ByteArrayDataOutput out = new ByteArrayDataOutput(bytes);
out.reset(bytes);
final BytesStreamOutput out = new BytesStreamOutput(4);
out.reset();
out.writeInt(i);
writer.add(ReleasableBytesReference.wrap(new BytesArray(bytes)), randomNonNegativeLong());
writer.add(ReleasableBytesReference.wrap(out.bytes()), randomNonNegativeLong());
}
writer.sync();
final Checkpoint writerCheckpoint = writer.getCheckpoint();