HDFS-3492. Fix some misuses of InputStream#skip. Contributed by Colin Patrick McCabe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1347191 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3f1179c966
commit
56de3ad39d
@ -42,10 +42,7 @@ public InputStreamEntity(InputStream is) {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void write(OutputStream os) throws IOException {
|
public void write(OutputStream os) throws IOException {
|
||||||
long skipped = is.skip(offset);
|
IOUtils.skipFully(is, offset);
|
||||||
if (skipped < offset) {
|
|
||||||
throw new IOException("Requested offset beyond stream size");
|
|
||||||
}
|
|
||||||
if (len == -1) {
|
if (len == -1) {
|
||||||
IOUtils.copyBytes(is, os, 4096, true);
|
IOUtils.copyBytes(is, os, 4096, true);
|
||||||
} else {
|
} else {
|
||||||
|
@ -159,6 +159,9 @@ Release 2.0.1-alpha - UNRELEASED
|
|||||||
HDFS-3505. DirectoryScanner does not join all threads in shutdown.
|
HDFS-3505. DirectoryScanner does not join all threads in shutdown.
|
||||||
(Colin Patrick McCabe via eli)
|
(Colin Patrick McCabe via eli)
|
||||||
|
|
||||||
|
HDFS-3492. Fix some misuses of InputStream#skip (Colin Patrick McCabe
|
||||||
|
via todd)
|
||||||
|
|
||||||
Release 2.0.0-alpha - 05-23-2012
|
Release 2.0.0-alpha - 05-23-2012
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -39,6 +39,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||||
import org.apache.hadoop.hdfs.util.DirectBufferPool;
|
import org.apache.hadoop.hdfs.util.DirectBufferPool;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
|
|
||||||
@ -284,24 +285,11 @@ private BlockReaderLocal(Configuration conf, String hdfsfile,
|
|||||||
//Initially the buffers have nothing to read.
|
//Initially the buffers have nothing to read.
|
||||||
dataBuff.flip();
|
dataBuff.flip();
|
||||||
checksumBuff.flip();
|
checksumBuff.flip();
|
||||||
long toSkip = firstChunkOffset;
|
IOUtils.skipFully(dataIn, firstChunkOffset);
|
||||||
while (toSkip > 0) {
|
|
||||||
long skipped = dataIn.skip(toSkip);
|
|
||||||
if (skipped == 0) {
|
|
||||||
throw new IOException("Couldn't initialize input stream");
|
|
||||||
}
|
|
||||||
toSkip -= skipped;
|
|
||||||
}
|
|
||||||
if (checksumIn != null) {
|
if (checksumIn != null) {
|
||||||
long checkSumOffset = (firstChunkOffset / bytesPerChecksum)
|
long checkSumOffset = (firstChunkOffset / bytesPerChecksum)
|
||||||
* checksumSize;
|
* checksumSize;
|
||||||
while (checkSumOffset > 0) {
|
IOUtils.skipFully(dataIn, checkSumOffset);
|
||||||
long skipped = checksumIn.skip(checkSumOffset);
|
|
||||||
if (skipped == 0) {
|
|
||||||
throw new IOException("Couldn't initialize checksum input stream");
|
|
||||||
}
|
|
||||||
checkSumOffset -= skipped;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -407,17 +395,9 @@ public synchronized long skip(long n) throws IOException {
|
|||||||
dataBuff.clear();
|
dataBuff.clear();
|
||||||
checksumBuff.clear();
|
checksumBuff.clear();
|
||||||
|
|
||||||
long dataSkipped = dataIn.skip(toskip);
|
IOUtils.skipFully(dataIn, toskip);
|
||||||
if (dataSkipped != toskip) {
|
long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize;
|
||||||
throw new IOException("skip error in data input stream");
|
IOUtils.skipFully(checksumIn, checkSumOffset);
|
||||||
}
|
|
||||||
long checkSumOffset = (dataSkipped / bytesPerChecksum) * checksumSize;
|
|
||||||
if (checkSumOffset > 0) {
|
|
||||||
long skipped = checksumIn.skip(checkSumOffset);
|
|
||||||
if (skipped != checkSumOffset) {
|
|
||||||
throw new IOException("skip error in checksum input stream");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// read into the middle of the chunk
|
// read into the middle of the chunk
|
||||||
if (skipBuf == null) {
|
if (skipBuf == null) {
|
||||||
|
@ -44,6 +44,7 @@
|
|||||||
import org.apache.hadoop.io.BytesWritable;
|
import org.apache.hadoop.io.BytesWritable;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
import org.apache.hadoop.io.ArrayWritable;
|
import org.apache.hadoop.io.ArrayWritable;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
import org.apache.hadoop.io.Writable;
|
import org.apache.hadoop.io.Writable;
|
||||||
import org.apache.hadoop.io.WritableFactories;
|
import org.apache.hadoop.io.WritableFactories;
|
||||||
@ -2289,9 +2290,11 @@ public FSEditLogOp readOp(boolean skipBrokenEdits) throws IOException {
|
|||||||
// 0xff, we want to skip over that region, because there's nothing
|
// 0xff, we want to skip over that region, because there's nothing
|
||||||
// interesting there.
|
// interesting there.
|
||||||
long numSkip = e.getNumAfterTerminator();
|
long numSkip = e.getNumAfterTerminator();
|
||||||
if (in.skip(numSkip) < numSkip) {
|
try {
|
||||||
|
IOUtils.skipFully(in, numSkip);
|
||||||
|
} catch (IOException t) {
|
||||||
FSImage.LOG.error("Failed to skip " + numSkip + " bytes of " +
|
FSImage.LOG.error("Failed to skip " + numSkip + " bytes of " +
|
||||||
"garbage after an OP_INVALID. Unexpected early EOF.");
|
"garbage after an OP_INVALID. Unexpected early EOF.", t);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -38,6 +38,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
@ -92,8 +93,7 @@ static void checkFileContent(FileSystem fs, Path name, byte[] expected,
|
|||||||
// Now read using a different API.
|
// Now read using a different API.
|
||||||
actual = new byte[expected.length-readOffset];
|
actual = new byte[expected.length-readOffset];
|
||||||
stm = fs.open(name);
|
stm = fs.open(name);
|
||||||
long skipped = stm.skip(readOffset);
|
IOUtils.skipFully(stm, readOffset);
|
||||||
Assert.assertEquals(skipped, readOffset);
|
|
||||||
//Read a small number of bytes first.
|
//Read a small number of bytes first.
|
||||||
int nread = stm.read(actual, 0, 3);
|
int nread = stm.read(actual, 0, 3);
|
||||||
nread += stm.read(actual, nread, 2);
|
nread += stm.read(actual, nread, 2);
|
||||||
|
@ -47,6 +47,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||||
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.metrics2.util.MBeans;
|
import org.apache.hadoop.metrics2.util.MBeans;
|
||||||
import org.apache.hadoop.util.DataChecksum;
|
import org.apache.hadoop.util.DataChecksum;
|
||||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||||
@ -686,7 +687,7 @@ synchronized InputStream getBlockInputStream(ExtendedBlock b
|
|||||||
public synchronized InputStream getBlockInputStream(ExtendedBlock b,
|
public synchronized InputStream getBlockInputStream(ExtendedBlock b,
|
||||||
long seekOffset) throws IOException {
|
long seekOffset) throws IOException {
|
||||||
InputStream result = getBlockInputStream(b);
|
InputStream result = getBlockInputStream(b);
|
||||||
result.skip(seekOffset);
|
IOUtils.skipFully(result, seekOffset);
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user