HDFS-9147. Fix the setting of visibleLength in ExternalBlockReader. (Colin P. McCabe via Lei (Eddy) Xu)
This commit is contained in:
parent
50741cb568
commit
e5992ef4df
|
@ -392,6 +392,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
Constructor<? extends ReplicaAccessorBuilder> ctor =
|
Constructor<? extends ReplicaAccessorBuilder> ctor =
|
||||||
cls.getConstructor();
|
cls.getConstructor();
|
||||||
ReplicaAccessorBuilder builder = ctor.newInstance();
|
ReplicaAccessorBuilder builder = ctor.newInstance();
|
||||||
|
long visibleLength = startOffset + length;
|
||||||
ReplicaAccessor accessor = builder.
|
ReplicaAccessor accessor = builder.
|
||||||
setAllowShortCircuitReads(allowShortCircuitLocalReads).
|
setAllowShortCircuitReads(allowShortCircuitLocalReads).
|
||||||
setBlock(block.getBlockId(), block.getBlockPoolId()).
|
setBlock(block.getBlockId(), block.getBlockPoolId()).
|
||||||
|
@ -401,7 +402,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
setConfiguration(configuration).
|
setConfiguration(configuration).
|
||||||
setFileName(fileName).
|
setFileName(fileName).
|
||||||
setVerifyChecksum(verifyChecksum).
|
setVerifyChecksum(verifyChecksum).
|
||||||
setVisibleLength(length).
|
setVisibleLength(visibleLength).
|
||||||
build();
|
build();
|
||||||
if (accessor == null) {
|
if (accessor == null) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
|
@ -409,7 +410,7 @@ public class BlockReaderFactory implements ShortCircuitReplicaCreator {
|
||||||
cls.getName());
|
cls.getName());
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
return new ExternalBlockReader(accessor, length, startOffset);
|
return new ExternalBlockReader(accessor, visibleLength, startOffset);
|
||||||
}
|
}
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.warn("Failed to construct new object of type " +
|
LOG.warn("Failed to construct new object of type " +
|
||||||
|
|
|
@ -68,7 +68,8 @@ public final class ExternalBlockReader implements BlockReader {
|
||||||
if (n <= 0) {
|
if (n <= 0) {
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
// You can't skip past the end of the replica.
|
// You can't skip past the last offset that we want to read with this
|
||||||
|
// block reader.
|
||||||
long oldPos = pos;
|
long oldPos = pos;
|
||||||
pos += n;
|
pos += n;
|
||||||
if (pos > visibleLength) {
|
if (pos > visibleLength) {
|
||||||
|
@ -79,12 +80,11 @@ public final class ExternalBlockReader implements BlockReader {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int available() throws IOException {
|
public int available() throws IOException {
|
||||||
// We return the amount of bytes that we haven't read yet from the
|
// We return the amount of bytes between the current offset and the visible
|
||||||
// replica, based on our current position. Some of the other block
|
// length. Some of the other block readers return a shorter length than
|
||||||
// readers return a shorter length than that. The only advantage to
|
// that. The only advantage to returning a shorter length is that the
|
||||||
// returning a shorter length is that the DFSInputStream will
|
// DFSInputStream will trash your block reader and create a new one if
|
||||||
// trash your block reader and create a new one if someone tries to
|
// someone tries to seek() beyond the available() region.
|
||||||
// seek() beyond the available() region.
|
|
||||||
long diff = visibleLength - pos;
|
long diff = visibleLength - pos;
|
||||||
if (diff > Integer.MAX_VALUE) {
|
if (diff > Integer.MAX_VALUE) {
|
||||||
return Integer.MAX_VALUE;
|
return Integer.MAX_VALUE;
|
||||||
|
|
|
@ -1444,6 +1444,9 @@ Release 2.8.0 - UNRELEASED
|
||||||
HDFS-9107. Prevent NN's unrecoverable death spiral after full GC (Daryn
|
HDFS-9107. Prevent NN's unrecoverable death spiral after full GC (Daryn
|
||||||
Sharp via Colin P. McCabe)
|
Sharp via Colin P. McCabe)
|
||||||
|
|
||||||
|
HDFS-9147. Fix the setting of visibleLength in ExternalBlockReader. (Colin
|
||||||
|
P. McCabe via Lei (Eddy) Xu)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -190,7 +190,7 @@ public class TestExternalBlockReader {
|
||||||
"than 0 at " + pos);
|
"than 0 at " + pos);
|
||||||
return 0;
|
return 0;
|
||||||
}
|
}
|
||||||
int i = 0, nread = 0, ipos;
|
int i = off, nread = 0, ipos;
|
||||||
for (ipos = (int)pos;
|
for (ipos = (int)pos;
|
||||||
(ipos < contents.length) && (nread < len);
|
(ipos < contents.length) && (nread < len);
|
||||||
ipos++) {
|
ipos++) {
|
||||||
|
@ -280,7 +280,10 @@ public class TestExternalBlockReader {
|
||||||
HdfsDataInputStream stream =
|
HdfsDataInputStream stream =
|
||||||
(HdfsDataInputStream)dfs.open(new Path("/a"));
|
(HdfsDataInputStream)dfs.open(new Path("/a"));
|
||||||
byte buf[] = new byte[TEST_LENGTH];
|
byte buf[] = new byte[TEST_LENGTH];
|
||||||
IOUtils.readFully(stream, buf, 0, TEST_LENGTH);
|
stream.seek(1000);
|
||||||
|
IOUtils.readFully(stream, buf, 1000, TEST_LENGTH - 1000);
|
||||||
|
stream.seek(0);
|
||||||
|
IOUtils.readFully(stream, buf, 0, 1000);
|
||||||
byte expected[] = DFSTestUtil.
|
byte expected[] = DFSTestUtil.
|
||||||
calculateFileContentsFromSeed(SEED, TEST_LENGTH);
|
calculateFileContentsFromSeed(SEED, TEST_LENGTH);
|
||||||
ReadStatistics stats = stream.getReadStatistics();
|
ReadStatistics stats = stream.getReadStatistics();
|
||||||
|
@ -293,7 +296,7 @@ public class TestExternalBlockReader {
|
||||||
Assert.assertNotNull(block);
|
Assert.assertNotNull(block);
|
||||||
LinkedList<SyntheticReplicaAccessor> accessorList = accessors.get(uuid);
|
LinkedList<SyntheticReplicaAccessor> accessorList = accessors.get(uuid);
|
||||||
Assert.assertNotNull(accessorList);
|
Assert.assertNotNull(accessorList);
|
||||||
Assert.assertEquals(2, accessorList.size());
|
Assert.assertEquals(3, accessorList.size());
|
||||||
SyntheticReplicaAccessor accessor = accessorList.get(0);
|
SyntheticReplicaAccessor accessor = accessorList.get(0);
|
||||||
Assert.assertTrue(accessor.builder.allowShortCircuit);
|
Assert.assertTrue(accessor.builder.allowShortCircuit);
|
||||||
Assert.assertEquals(block.getBlockPoolId(),
|
Assert.assertEquals(block.getBlockPoolId(),
|
||||||
|
@ -307,7 +310,7 @@ public class TestExternalBlockReader {
|
||||||
accessor.getGenerationStamp());
|
accessor.getGenerationStamp());
|
||||||
Assert.assertTrue(accessor.builder.verifyChecksum);
|
Assert.assertTrue(accessor.builder.verifyChecksum);
|
||||||
Assert.assertEquals(1024L, accessor.builder.visibleLength);
|
Assert.assertEquals(1024L, accessor.builder.visibleLength);
|
||||||
Assert.assertEquals(1024L, accessor.totalRead);
|
Assert.assertEquals(24L, accessor.totalRead);
|
||||||
Assert.assertEquals("", accessor.getError());
|
Assert.assertEquals("", accessor.getError());
|
||||||
Assert.assertEquals(1, accessor.numCloses);
|
Assert.assertEquals(1, accessor.numCloses);
|
||||||
byte[] tempBuf = new byte[5];
|
byte[] tempBuf = new byte[5];
|
||||||
|
|
Loading…
Reference in New Issue