Use int for number of parts in blob store (#61618)
Today we use `long` to represent the number of parts of a blob. There's no need for this extra range, it forces us to do some casting elsewhere, and indeed when snapshotting we iterate over the parts using an `int` which would be an infinite loop in case of overflow anyway: for (int i = 0; i < fileInfo.numberOfParts(); i++) { This commit changes the representation of the number of parts of a blob to an `int`.
This commit is contained in:
parent
aac9eb6b64
commit
b866aaf81c
|
@ -36,6 +36,7 @@ import java.io.IOException;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
/**
|
||||
* Shard snapshot metadata
|
||||
|
@ -50,7 +51,7 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
|
|||
private final String name;
|
||||
private final ByteSizeValue partSize;
|
||||
private final long partBytes;
|
||||
private final long numberOfParts;
|
||||
private final int numberOfParts;
|
||||
private final StoreFileMetadata metadata;
|
||||
|
||||
/**
|
||||
|
@ -69,17 +70,19 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
|
|||
partBytes = partSize.getBytes();
|
||||
}
|
||||
|
||||
long totalLength = metadata.length();
|
||||
long numberOfParts = totalLength / partBytes;
|
||||
if (totalLength % partBytes > 0) {
|
||||
numberOfParts++;
|
||||
if (metadata.length() == 0) {
|
||||
numberOfParts = 1;
|
||||
} else {
|
||||
long longNumberOfParts = 1L + (metadata.length() - 1L) / partBytes; // ceil(len/partBytes), but beware of long overflow
|
||||
numberOfParts = (int)longNumberOfParts;
|
||||
if (numberOfParts != longNumberOfParts) { // also beware of int overflow, although 2^32 parts is already ludicrous
|
||||
throw new IllegalArgumentException("part size [" + partSize + "] too small for file [" + metadata + "]");
|
||||
}
|
||||
if (numberOfParts == 0) {
|
||||
numberOfParts++;
|
||||
}
|
||||
this.numberOfParts = numberOfParts;
|
||||
|
||||
this.partSize = partSize;
|
||||
this.partBytes = partBytes;
|
||||
assert IntStream.range(0, numberOfParts).mapToLong(this::partBytes).sum() == metadata.length();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -97,7 +100,7 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
|
|||
* @param part part number
|
||||
* @return part name
|
||||
*/
|
||||
public String partName(long part) {
|
||||
public String partName(int part) {
|
||||
if (numberOfParts > 1) {
|
||||
return name + ".part" + part;
|
||||
} else {
|
||||
|
@ -151,6 +154,7 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
|
|||
* @return the size (in bytes) of a given part
|
||||
*/
|
||||
public long partBytes(int part) {
|
||||
assert 0 <= part && part < numberOfParts : part + " vs " + numberOfParts;
|
||||
if (numberOfParts == 1) {
|
||||
return length();
|
||||
}
|
||||
|
@ -159,7 +163,9 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
|
|||
return partBytes;
|
||||
}
|
||||
// Last part size is deducted from the length and the number of parts
|
||||
return length() - (partBytes * (numberOfParts-1));
|
||||
final long lastPartBytes = length() - (this.partBytes * (numberOfParts - 1));
|
||||
assert 0 < lastPartBytes && lastPartBytes <= partBytes : lastPartBytes + " vs " + partBytes;
|
||||
return lastPartBytes;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -167,7 +173,7 @@ public class BlobStoreIndexShardSnapshot implements ToXContentFragment {
|
|||
*
|
||||
* @return number of parts
|
||||
*/
|
||||
public long numberOfParts() {
|
||||
public int numberOfParts() {
|
||||
return numberOfParts;
|
||||
}
|
||||
|
||||
|
|
|
@ -27,21 +27,21 @@ import java.io.InputStream;
|
|||
* A {@link SlicedInputStream} is a logical
|
||||
* concatenation one or more input streams. In contrast to the JDKs
|
||||
* {@link java.io.SequenceInputStream} this stream doesn't require the instantiation
|
||||
* of all logical sub-streams ahead of time. Instead, {@link #openSlice(long)} is called
|
||||
* of all logical sub-streams ahead of time. Instead, {@link #openSlice(int)} is called
|
||||
* if a new slice is required. Each slice is closed once it's been fully consumed or if
|
||||
* close is called before.
|
||||
*/
|
||||
public abstract class SlicedInputStream extends InputStream {
|
||||
private long slice = 0;
|
||||
private int slice = 0;
|
||||
private InputStream currentStream;
|
||||
private final long numSlices;
|
||||
private final int numSlices;
|
||||
private boolean initialized = false;
|
||||
|
||||
/**
|
||||
* Creates a new SlicedInputStream
|
||||
* @param numSlices the number of slices to consume
|
||||
*/
|
||||
protected SlicedInputStream(final long numSlices) {
|
||||
protected SlicedInputStream(final int numSlices) {
|
||||
this.numSlices = numSlices;
|
||||
}
|
||||
|
||||
|
@ -60,7 +60,7 @@ public abstract class SlicedInputStream extends InputStream {
|
|||
/**
|
||||
* Called for each logical slice given a zero based slice ordinal.
|
||||
*/
|
||||
protected abstract InputStream openSlice(long slice) throws IOException;
|
||||
protected abstract InputStream openSlice(int slice) throws IOException;
|
||||
|
||||
private InputStream currentStream() throws IOException {
|
||||
if (currentStream == null) {
|
||||
|
|
|
@ -2098,7 +2098,7 @@ public abstract class BlobStoreRepository extends AbstractLifecycleComponent imp
|
|||
} else {
|
||||
try (InputStream stream = maybeRateLimitRestores(new SlicedInputStream(fileInfo.numberOfParts()) {
|
||||
@Override
|
||||
protected InputStream openSlice(long slice) throws IOException {
|
||||
protected InputStream openSlice(int slice) throws IOException {
|
||||
return container.readBlob(fileInfo.partName(slice));
|
||||
}
|
||||
})) {
|
||||
|
|
|
@ -170,6 +170,5 @@ public class FileInfoTests extends ESTestCase {
|
|||
}
|
||||
assertEquals(numBytes, metadata.length());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -60,10 +60,9 @@ public class SlicedInputStreamTests extends ESTestCase {
|
|||
}
|
||||
|
||||
SlicedInputStream input = new SlicedInputStream(parts) {
|
||||
|
||||
@Override
|
||||
protected InputStream openSlice(long slice) throws IOException {
|
||||
return streams[(int)slice];
|
||||
protected InputStream openSlice(int slice) throws IOException {
|
||||
return streams[slice];
|
||||
}
|
||||
};
|
||||
random = new Random(seed);
|
||||
|
|
|
@ -439,7 +439,7 @@ public class SearchableSnapshotDirectory extends BaseDirectory {
|
|||
final IndexInput input = openInput(file.physicalName(), CachedBlobContainerIndexInput.CACHE_WARMING_CONTEXT);
|
||||
assert input instanceof CachedBlobContainerIndexInput : "expected cached index input but got " + input.getClass();
|
||||
|
||||
final int numberOfParts = Math.toIntExact(file.numberOfParts());
|
||||
final int numberOfParts = file.numberOfParts();
|
||||
final StepListener<Collection<Void>> fileCompletionListener = new StepListener<>();
|
||||
fileCompletionListener.whenComplete(voids -> input.close(), e -> IOUtils.closeWhileHandlingException(input));
|
||||
fileCompletionListener.whenComplete(voids -> completionListener.onResponse(null), completionListener::onFailure);
|
||||
|
|
|
@ -655,12 +655,12 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|||
+ fileInfo
|
||||
+ "]";
|
||||
stats.addBlobStoreBytesRequested(length);
|
||||
return blobContainer.readBlob(fileInfo.partName(0L), position, length);
|
||||
return blobContainer.readBlob(fileInfo.partName(0), position, length);
|
||||
} else {
|
||||
final long startPart = getPartNumberForPosition(position);
|
||||
final long endPart = getPartNumberForPosition(position + length - 1);
|
||||
final int startPart = getPartNumberForPosition(position);
|
||||
final int endPart = getPartNumberForPosition(position + length - 1);
|
||||
|
||||
for (long currentPart = startPart; currentPart <= endPart; currentPart++) {
|
||||
for (int currentPart = startPart; currentPart <= endPart; currentPart++) {
|
||||
final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L;
|
||||
final long endInPart = (currentPart == endPart)
|
||||
? getRelativePositionInPart(position + length - 1) + 1
|
||||
|
@ -668,10 +668,10 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|||
stats.addBlobStoreBytesRequested(endInPart - startInPart);
|
||||
}
|
||||
|
||||
return new SlicedInputStream(endPart - startPart + 1L) {
|
||||
return new SlicedInputStream(endPart - startPart + 1) {
|
||||
@Override
|
||||
protected InputStream openSlice(long slice) throws IOException {
|
||||
final long currentPart = startPart + slice;
|
||||
protected InputStream openSlice(int slice) throws IOException {
|
||||
final int currentPart = startPart + slice;
|
||||
final long startInPart = (currentPart == startPart) ? getRelativePositionInPart(position) : 0L;
|
||||
final long endInPart = (currentPart == endPart)
|
||||
? getRelativePositionInPart(position + length - 1) + 1
|
||||
|
@ -685,11 +685,11 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|||
/**
|
||||
* Compute the part number that contains the byte at the given position in the corresponding Lucene file.
|
||||
*/
|
||||
private long getPartNumberForPosition(long position) {
|
||||
private int getPartNumberForPosition(long position) {
|
||||
ensureValidPosition(position);
|
||||
final long part = position / fileInfo.partSize().getBytes();
|
||||
final int part = Math.toIntExact(position / fileInfo.partSize().getBytes());
|
||||
assert part <= fileInfo.numberOfParts() : "part number [" + part + "] exceeds number of parts: " + fileInfo.numberOfParts();
|
||||
assert part >= 0L : "part number [" + part + "] is negative";
|
||||
assert part >= 0 : "part number [" + part + "] is negative";
|
||||
return part;
|
||||
}
|
||||
|
||||
|
@ -700,13 +700,13 @@ public class CachedBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|||
private long getRelativePositionInPart(long position) {
|
||||
ensureValidPosition(position);
|
||||
final long pos = position % fileInfo.partSize().getBytes();
|
||||
assert pos < fileInfo.partBytes((int) getPartNumberForPosition(pos)) : "position in part [" + pos + "] exceeds part's length";
|
||||
assert pos < fileInfo.partBytes(getPartNumberForPosition(pos)) : "position in part [" + pos + "] exceeds part's length";
|
||||
assert pos >= 0L : "position in part [" + pos + "] is negative";
|
||||
return pos;
|
||||
}
|
||||
|
||||
private long getLengthOfPart(long part) {
|
||||
return fileInfo.partBytes(toIntBytes(part));
|
||||
private long getLengthOfPart(int part) {
|
||||
return fileInfo.partBytes(part);
|
||||
}
|
||||
|
||||
private void ensureValidPosition(long position) {
|
||||
|
|
|
@ -105,18 +105,18 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|||
@Override
|
||||
protected void readInternal(ByteBuffer b) throws IOException {
|
||||
ensureOpen();
|
||||
if (fileInfo.numberOfParts() == 1L) {
|
||||
if (fileInfo.numberOfParts() == 1) {
|
||||
readInternalBytes(0, position, b, b.remaining());
|
||||
} else {
|
||||
while (b.hasRemaining()) {
|
||||
int currentPart = Math.toIntExact(position / fileInfo.partSize().getBytes());
|
||||
int remainingBytesInPart;
|
||||
long remainingBytesInPart;
|
||||
if (currentPart < (fileInfo.numberOfParts() - 1)) {
|
||||
remainingBytesInPart = toIntBytes(((currentPart + 1L) * fileInfo.partSize().getBytes()) - position);
|
||||
remainingBytesInPart = ((currentPart + 1) * fileInfo.partSize().getBytes()) - position;
|
||||
} else {
|
||||
remainingBytesInPart = toIntBytes(fileInfo.length() - position);
|
||||
}
|
||||
final int read = Math.min(b.remaining(), remainingBytesInPart);
|
||||
final int read = toIntBytes(Math.min(b.remaining(), remainingBytesInPart));
|
||||
readInternalBytes(currentPart, position % fileInfo.partSize().getBytes(), b, read);
|
||||
}
|
||||
}
|
||||
|
@ -211,8 +211,8 @@ public class DirectBlobContainerIndexInput extends BaseSearchableSnapshotIndexIn
|
|||
// it and keep it open for future reads
|
||||
final InputStream inputStream = openBlobStream(part, pos, streamLength);
|
||||
streamForSequentialReads = new StreamForSequentialReads(new FilterInputStream(inputStream) {
|
||||
private LongAdder bytesRead = new LongAdder();
|
||||
private LongAdder timeNanos = new LongAdder();
|
||||
private final LongAdder bytesRead = new LongAdder();
|
||||
private final LongAdder timeNanos = new LongAdder();
|
||||
|
||||
private int onOptimizedRead(CheckedSupplier<Integer, IOException> read) throws IOException {
|
||||
final long startTimeNanos = stats.currentTimeNanos();
|
||||
|
|
|
@ -79,18 +79,18 @@ public class DirectBlobContainerIndexInputTests extends ESIndexInputTestCase {
|
|||
onReadBlob.run();
|
||||
|
||||
final InputStream stream;
|
||||
if (fileInfo.numberOfParts() == 1L) {
|
||||
if (fileInfo.numberOfParts() == 1) {
|
||||
assertThat("Unexpected blob name [" + name + "]", name, equalTo(fileInfo.name()));
|
||||
stream = new ByteArrayInputStream(input, toIntBytes(position), toIntBytes(length));
|
||||
|
||||
} else {
|
||||
assertThat("Unexpected blob name [" + name + "]", name, allOf(startsWith(fileInfo.name()), containsString(".part")));
|
||||
|
||||
long partNumber = Long.parseLong(name.substring(name.indexOf(".part") + ".part".length()));
|
||||
int partNumber = Integer.parseInt(name.substring(name.indexOf(".part") + ".part".length()));
|
||||
assertThat(
|
||||
"Unexpected part number [" + partNumber + "] for [" + name + "]",
|
||||
partNumber,
|
||||
allOf(greaterThanOrEqualTo(0L), lessThan(fileInfo.numberOfParts()))
|
||||
allOf(greaterThanOrEqualTo(0), lessThan(fileInfo.numberOfParts()))
|
||||
);
|
||||
|
||||
stream = new ByteArrayInputStream(input, toIntBytes(partNumber * partSize + position), toIntBytes(length));
|
||||
|
|
Loading…
Reference in New Issue