From 5c779078dd0466f68e36755af9a2b5e47f71fe3f Mon Sep 17 00:00:00 2001 From: Eli Collins Date: Tue, 13 Dec 2011 08:16:40 +0000 Subject: [PATCH] HDFS-2654. svn merge -c 1213592/1213593 from trunk git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1213594 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 2 + .../apache/hadoop/hdfs/BlockReaderLocal.java | 82 +++++++++++++++---- .../apache/hadoop/hdfs/BlockReaderUtil.java | 54 ++++++++++++ .../hadoop/hdfs/RemoteBlockReader2.java | 40 +++------ 4 files changed, 134 insertions(+), 44 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 98137d33c6c..69ddd001817 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -50,6 +50,8 @@ Release 0.23.1 - UNRELEASED HDFS-2511. Add dev script to generate HDFS protobufs. (tucu) + HDFS-2654. Make BlockReaderLocal not extend RemoteBlockReader2. (eli) + OPTIMIZATIONS HDFS-2130. Switch default checksum to CRC32C. (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java index d34d74d4388..6c7f829f56f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderLocal.java @@ -21,6 +21,7 @@ import java.io.DataInputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.net.Socket; import java.nio.ByteBuffer; import java.util.Collections; import java.util.HashMap; @@ -37,6 +38,7 @@ import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.FSDataset; +import org.apache.hadoop.hdfs.util.DirectBufferPool; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.DataChecksum; @@ -57,8 +59,8 @@ import org.apache.hadoop.util.DataChecksum; * if security is enabled. * */ -class BlockReaderLocal extends RemoteBlockReader2 { - public static final Log LOG = LogFactory.getLog(DFSClient.class); +class BlockReaderLocal implements BlockReader { + private static final Log LOG = LogFactory.getLog(DFSClient.class); //Stores the cache and proxy for a local datanode. private static class LocalDatanodeInfo { @@ -117,13 +119,24 @@ class BlockReaderLocal extends RemoteBlockReader2 { private static Map localDatanodeInfoMap = new HashMap(); private final FileInputStream dataIn; // reader for the data file - private FileInputStream checksumIn; // reader for the checksum file private int offsetFromChunkBoundary; - ByteBuffer dataBuff = null; - ByteBuffer checksumBuff = null; + private byte[] skipBuf = null; + private ByteBuffer dataBuff = null; + private ByteBuffer checksumBuff = null; + private DataChecksum checksum; + private final boolean verifyChecksum; + + private static DirectBufferPool bufferPool = new DirectBufferPool(); + + private int bytesPerChecksum; + private int checksumSize; + + /** offset in block where reader wants to actually read */ + private long startOffset; + private final String filename; /** * The only way this object can be instantiated. @@ -256,9 +269,14 @@ class BlockReaderLocal extends RemoteBlockReader2 { long length, BlockLocalPathInfo pathinfo, DataChecksum checksum, boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, FileInputStream checksumIn) throws IOException { - super(hdfsfile, block.getBlockPoolId(), block.getBlockId(), dataIn - .getChannel(), checksum, verifyChecksum, startOffset, firstChunkOffset, - length, null); + this.filename = hdfsfile; + this.checksum = checksum; + this.verifyChecksum = verifyChecksum; + this.startOffset = Math.max(startOffset, 0); + + bytesPerChecksum = this.checksum.getBytesPerChecksum(); + checksumSize = this.checksum.getChecksumSize(); + this.dataIn = dataIn; this.checksumIn = checksumIn; this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); @@ -322,10 +340,8 @@ class BlockReaderLocal extends RemoteBlockReader2 { readIntoBuffer(checksumIn, checksumBuff); checksumBuff.flip(); dataBuff.flip(); - if (verifyChecksum) { - checksum.verifyChunkedSums(dataBuff, checksumBuff, filename, - this.startOffset); - } + checksum.verifyChunkedSums(dataBuff, checksumBuff, filename, + this.startOffset); } else { dataRead = dataBuff.remaining(); } @@ -356,9 +372,24 @@ class BlockReaderLocal extends RemoteBlockReader2 { } if (!verifyChecksum) { return dataIn.skip(n); - } else { - return super.skip(n); } + // Skip by reading the data so we stay in sync with checksums. + // This could be implemented more efficiently in the future to + // skip to the beginning of the appropriate checksum chunk + // and then only read to the middle of that chunk. + if (skipBuf == null) { + skipBuf = new byte[bytesPerChecksum]; + } + long nSkipped = 0; + while ( nSkipped < n ) { + int toSkip = (int)Math.min(n-nSkipped, skipBuf.length); + int ret = read(skipBuf, 0, toSkip); + if ( ret <= 0 ) { + return nSkipped; + } + nSkipped += ret; + } + return nSkipped; } @Override @@ -375,6 +406,27 @@ class BlockReaderLocal extends RemoteBlockReader2 { bufferPool.returnBuffer(checksumBuff); checksumBuff = null; } - super.close(); + startOffset = -1; + checksum = null; + } + + @Override + public int readAll(byte[] buf, int offset, int len) throws IOException { + return BlockReaderUtil.readAll(this, buf, offset, len); + } + + @Override + public void readFully(byte[] buf, int off, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, off, len); + } + + @Override + public Socket takeSocket() { + return null; + } + + @Override + public boolean hasSentStatusCode() { + return false; } } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java new file mode 100644 index 00000000000..a9f5c85bcad --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockReaderUtil.java @@ -0,0 +1,54 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; + +/** + * For sharing between the local and remote block reader implementations. + */ +class BlockReaderUtil { + + /* See {@link BlockReader#readAll(byte[], int, int)} */ + public static int readAll(BlockReader reader, + byte[] buf, int offset, int len) throws IOException { + int n = 0; + for (;;) { + int nread = reader.read(buf, offset + n, len - n); + if (nread <= 0) + return (n == 0) ? nread : n; + n += nread; + if (n >= len) + return n; + } + } + + /* See {@link BlockReader#readFully(byte[], int, int)} */ + public static void readFully(BlockReader reader, + byte[] buf, int off, int len) throws IOException { + int toRead = len; + while (toRead > 0) { + int ret = reader.read(buf, off, toRead); + if (ret < 0) { + throw new IOException("Premature EOF from inputStream"); + } + toRead -= ret; + off += ret; + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java index 1f5f12bda75..ea247775714 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/RemoteBlockReader2.java @@ -85,7 +85,7 @@ public class RemoteBlockReader2 implements BlockReader { Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. private ReadableByteChannel in; - protected DataChecksum checksum; + private DataChecksum checksum; private PacketHeader curHeader; private ByteBuffer curPacketBuf = null; @@ -96,25 +96,24 @@ public class RemoteBlockReader2 implements BlockReader { private long lastSeqNo = -1; /** offset in block where reader wants to actually read */ - protected long startOffset; - protected final String filename; + private long startOffset; + private final String filename; - protected static DirectBufferPool bufferPool = - new DirectBufferPool(); + private static DirectBufferPool bufferPool = new DirectBufferPool(); private ByteBuffer headerBuf = ByteBuffer.allocate( PacketHeader.PKT_HEADER_LEN); - protected int bytesPerChecksum; - protected int checksumSize; + private int bytesPerChecksum; + private int checksumSize; /** * The total number of bytes we need to transfer from the DN. * This is the amount that the user has requested plus some padding * at the beginning so that the read can begin on a chunk boundary. */ - protected long bytesNeededToFinish; + private long bytesNeededToFinish; - protected final boolean verifyChecksum; + private final boolean verifyChecksum; private boolean sentStatusCode = false; @@ -389,29 +388,12 @@ public class RemoteBlockReader2 implements BlockReader { @Override public int readAll(byte[] buf, int offset, int len) throws IOException { - int n = 0; - for (;;) { - int nread = read(buf, offset + n, len - n); - if (nread <= 0) - return (n == 0) ? nread : n; - n += nread; - if (n >= len) - return n; - } + return BlockReaderUtil.readAll(this, buf, offset, len); } @Override - public void readFully(byte[] buf, int off, int len) - throws IOException { - int toRead = len; - while (toRead > 0) { - int ret = read(buf, off, toRead); - if (ret < 0) { - throw new IOException("Premature EOF from inputStream"); - } - toRead -= ret; - off += ret; - } + public void readFully(byte[] buf, int off, int len) throws IOException { + BlockReaderUtil.readFully(this, buf, off, len); } /**