Revert HDFS-3492 from r1347192: patch broke TestShortCircuitLocalRead
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1347796 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6d8efb7378
commit
05a73a3a1e
|
@ -42,7 +42,10 @@ public class InputStreamEntity implements StreamingOutput {
|
|||
|
||||
@Override
|
||||
public void write(OutputStream os) throws IOException {
|
||||
IOUtils.skipFully(is, offset);
|
||||
long skipped = is.skip(offset);
|
||||
if (skipped < offset) {
|
||||
throw new IOException("Requested offset beyond stream size");
|
||||
}
|
||||
if (len == -1) {
|
||||
IOUtils.copyBytes(is, os, 4096, true);
|
||||
} else {
|
||||
|
|
|
@ -310,9 +310,6 @@ Branch-2 ( Unreleased changes )
|
|||
HDFS-3505. DirectoryScanner does not join all threads in shutdown.
|
||||
(Colin Patrick McCabe via eli)
|
||||
|
||||
HDFS-3492. Fix some misuses of InputStream#skip (Colin Patrick McCabe
|
||||
via todd)
|
||||
|
||||
HDFS-3485. DataTransferThrottler will over-throttle when currentTimeMillis
|
||||
jumps (Andy Isaacson via todd)
|
||||
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||
import org.apache.hadoop.hdfs.util.DirectBufferPool;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
|
@ -316,10 +315,23 @@ class BlockReaderLocal implements BlockReader {
|
|||
boolean success = false;
|
||||
try {
|
||||
// Skip both input streams to beginning of the chunk containing startOffset
|
||||
IOUtils.skipFully(dataIn, firstChunkOffset);
|
||||
long toSkip = firstChunkOffset;
|
||||
while (toSkip > 0) {
|
||||
long skipped = dataIn.skip(toSkip);
|
||||
if (skipped == 0) {
|
||||
throw new IOException("Couldn't initialize input stream");
|
||||
}
|
||||
toSkip -= skipped;
|
||||
}
|
||||
if (checksumIn != null) {
|
||||
long checkSumOffset = (firstChunkOffset / bytesPerChecksum) * checksumSize;
|
||||
IOUtils.skipFully(dataIn, checkSumOffset);
|
||||
while (checkSumOffset > 0) {
|
||||
long skipped = checksumIn.skip(checkSumOffset);
|
||||
if (skipped == 0) {
|
||||
throw new IOException("Couldn't initialize checksum input stream");
|
||||
}
|
||||
checkSumOffset -= skipped;
|
||||
}
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
|
@ -624,9 +636,17 @@ class BlockReaderLocal implements BlockReader {
|
|||
slowReadBuff.position(slowReadBuff.limit());
|
||||
checksumBuff.position(checksumBuff.limit());
|
||||
|
||||
IOUtils.skipFully(dataIn, toskip);
|
||||
long checkSumOffset = (toskip / bytesPerChecksum) * checksumSize;
|
||||
IOUtils.skipFully(checksumIn, checkSumOffset);
|
||||
long dataSkipped = dataIn.skip(toskip);
|
||||
if (dataSkipped != toskip) {
|
||||
throw new IOException("skip error in data input stream");
|
||||
}
|
||||
long checkSumOffset = (dataSkipped / bytesPerChecksum) * checksumSize;
|
||||
if (checkSumOffset > 0) {
|
||||
long skipped = checksumIn.skip(checkSumOffset);
|
||||
if (skipped != checkSumOffset) {
|
||||
throw new IOException("skip error in checksum input stream");
|
||||
}
|
||||
}
|
||||
|
||||
// read into the middle of the chunk
|
||||
if (skipBuf == null) {
|
||||
|
@ -681,4 +701,4 @@ class BlockReaderLocal implements BlockReader {
|
|||
public boolean hasSentStatusCode() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -44,7 +44,6 @@ import org.apache.hadoop.security.token.delegation.DelegationKey;
|
|||
import org.apache.hadoop.io.BytesWritable;
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.ArrayWritable;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableFactories;
|
||||
|
@ -2290,11 +2289,9 @@ public abstract class FSEditLogOp {
|
|||
// 0xff, we want to skip over that region, because there's nothing
|
||||
// interesting there.
|
||||
long numSkip = e.getNumAfterTerminator();
|
||||
try {
|
||||
IOUtils.skipFully(in, numSkip);
|
||||
} catch (IOException t) {
|
||||
if (in.skip(numSkip) < numSkip) {
|
||||
FSImage.LOG.error("Failed to skip " + numSkip + " bytes of " +
|
||||
"garbage after an OP_INVALID. Unexpected early EOF.", t);
|
||||
"garbage after an OP_INVALID. Unexpected early EOF.");
|
||||
return null;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
|||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.junit.Assert;
|
||||
|
@ -95,7 +94,8 @@ public class TestShortCircuitLocalRead {
|
|||
// Now read using a different API.
|
||||
actual = new byte[expected.length-readOffset];
|
||||
stm = fs.open(name);
|
||||
IOUtils.skipFully(stm, readOffset);
|
||||
long skipped = stm.skip(readOffset);
|
||||
Assert.assertEquals(skipped, readOffset);
|
||||
//Read a small number of bytes first.
|
||||
int nread = stm.read(actual, 0, 3);
|
||||
nread += stm.read(actual, nread, 2);
|
||||
|
@ -123,7 +123,8 @@ public class TestShortCircuitLocalRead {
|
|||
|
||||
ByteBuffer actual = ByteBuffer.allocate(expected.length - readOffset);
|
||||
|
||||
IOUtils.skipFully(stm, readOffset);
|
||||
long skipped = stm.skip(readOffset);
|
||||
Assert.assertEquals(skipped, readOffset);
|
||||
|
||||
actual.limit(3);
|
||||
|
||||
|
|
|
@ -47,7 +47,6 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
|
|||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.metrics2.util.MBeans;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
|
@ -687,7 +686,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
public synchronized InputStream getBlockInputStream(ExtendedBlock b,
|
||||
long seekOffset) throws IOException {
|
||||
InputStream result = getBlockInputStream(b);
|
||||
IOUtils.skipFully(result, seekOffset);
|
||||
result.skip(seekOffset);
|
||||
return result;
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue