HDFS-2654. svn merge -c 1213592/1213593 from trunk

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1213594 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Eli Collins 2011-12-13 08:16:40 +00:00
parent a3af423143
commit 5c779078dd
4 changed files with 134 additions and 44 deletions

View File

@ -50,6 +50,8 @@ Release 0.23.1 - UNRELEASED
HDFS-2511. Add dev script to generate HDFS protobufs. (tucu) HDFS-2511. Add dev script to generate HDFS protobufs. (tucu)
HDFS-2654. Make BlockReaderLocal not extend RemoteBlockReader2. (eli)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2130. Switch default checksum to CRC32C. (todd) HDFS-2130. Switch default checksum to CRC32C. (todd)

View File

@ -21,6 +21,7 @@
import java.io.File; import java.io.File;
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.Socket;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
@ -37,6 +38,7 @@
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader; import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
import org.apache.hadoop.hdfs.server.datanode.FSDataset; import org.apache.hadoop.hdfs.server.datanode.FSDataset;
import org.apache.hadoop.hdfs.util.DirectBufferPool;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DataChecksum;
@ -57,8 +59,8 @@
* if security is enabled.</li> * if security is enabled.</li>
* </ul> * </ul>
*/ */
class BlockReaderLocal extends RemoteBlockReader2 { class BlockReaderLocal implements BlockReader {
public static final Log LOG = LogFactory.getLog(DFSClient.class); private static final Log LOG = LogFactory.getLog(DFSClient.class);
//Stores the cache and proxy for a local datanode. //Stores the cache and proxy for a local datanode.
private static class LocalDatanodeInfo { private static class LocalDatanodeInfo {
@ -117,13 +119,24 @@ private void removeBlockLocalPathInfo(ExtendedBlock b) {
private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>(); private static Map<Integer, LocalDatanodeInfo> localDatanodeInfoMap = new HashMap<Integer, LocalDatanodeInfo>();
private final FileInputStream dataIn; // reader for the data file private final FileInputStream dataIn; // reader for the data file
private FileInputStream checksumIn; // reader for the checksum file private FileInputStream checksumIn; // reader for the checksum file
private int offsetFromChunkBoundary; private int offsetFromChunkBoundary;
ByteBuffer dataBuff = null; private byte[] skipBuf = null;
ByteBuffer checksumBuff = null; private ByteBuffer dataBuff = null;
private ByteBuffer checksumBuff = null;
private DataChecksum checksum;
private final boolean verifyChecksum;
private static DirectBufferPool bufferPool = new DirectBufferPool();
private int bytesPerChecksum;
private int checksumSize;
/** offset in block where reader wants to actually read */
private long startOffset;
private final String filename;
/** /**
* The only way this object can be instantiated. * The only way this object can be instantiated.
@ -256,9 +269,14 @@ private BlockReaderLocal(Configuration conf, String hdfsfile,
long length, BlockLocalPathInfo pathinfo, DataChecksum checksum, long length, BlockLocalPathInfo pathinfo, DataChecksum checksum,
boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset, boolean verifyChecksum, FileInputStream dataIn, long firstChunkOffset,
FileInputStream checksumIn) throws IOException { FileInputStream checksumIn) throws IOException {
super(hdfsfile, block.getBlockPoolId(), block.getBlockId(), dataIn this.filename = hdfsfile;
.getChannel(), checksum, verifyChecksum, startOffset, firstChunkOffset, this.checksum = checksum;
length, null); this.verifyChecksum = verifyChecksum;
this.startOffset = Math.max(startOffset, 0);
bytesPerChecksum = this.checksum.getBytesPerChecksum();
checksumSize = this.checksum.getChecksumSize();
this.dataIn = dataIn; this.dataIn = dataIn;
this.checksumIn = checksumIn; this.checksumIn = checksumIn;
this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset); this.offsetFromChunkBoundary = (int) (startOffset-firstChunkOffset);
@ -322,10 +340,8 @@ public synchronized int read(byte[] buf, int off, int len) throws IOException {
readIntoBuffer(checksumIn, checksumBuff); readIntoBuffer(checksumIn, checksumBuff);
checksumBuff.flip(); checksumBuff.flip();
dataBuff.flip(); dataBuff.flip();
if (verifyChecksum) {
checksum.verifyChunkedSums(dataBuff, checksumBuff, filename, checksum.verifyChunkedSums(dataBuff, checksumBuff, filename,
this.startOffset); this.startOffset);
}
} else { } else {
dataRead = dataBuff.remaining(); dataRead = dataBuff.remaining();
} }
@ -356,9 +372,24 @@ public synchronized long skip(long n) throws IOException {
} }
if (!verifyChecksum) { if (!verifyChecksum) {
return dataIn.skip(n); return dataIn.skip(n);
} else {
return super.skip(n);
} }
// Skip by reading the data so we stay in sync with checksums.
// This could be implemented more efficiently in the future to
// skip to the beginning of the appropriate checksum chunk
// and then only read to the middle of that chunk.
if (skipBuf == null) {
skipBuf = new byte[bytesPerChecksum];
}
long nSkipped = 0;
while ( nSkipped < n ) {
int toSkip = (int)Math.min(n-nSkipped, skipBuf.length);
int ret = read(skipBuf, 0, toSkip);
if ( ret <= 0 ) {
return nSkipped;
}
nSkipped += ret;
}
return nSkipped;
} }
@Override @Override
@ -375,6 +406,27 @@ public synchronized void close() throws IOException {
bufferPool.returnBuffer(checksumBuff); bufferPool.returnBuffer(checksumBuff);
checksumBuff = null; checksumBuff = null;
} }
super.close(); startOffset = -1;
checksum = null;
}
@Override
public int readAll(byte[] buf, int offset, int len) throws IOException {
return BlockReaderUtil.readAll(this, buf, offset, len);
}
@Override
public void readFully(byte[] buf, int off, int len) throws IOException {
BlockReaderUtil.readFully(this, buf, off, len);
}
@Override
public Socket takeSocket() {
return null;
}
@Override
public boolean hasSentStatusCode() {
return false;
} }
} }

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
/**
* For sharing between the local and remote block reader implementations.
*/
class BlockReaderUtil {
/* See {@link BlockReader#readAll(byte[], int, int)} */
public static int readAll(BlockReader reader,
byte[] buf, int offset, int len) throws IOException {
int n = 0;
for (;;) {
int nread = reader.read(buf, offset + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
if (n >= len)
return n;
}
}
/* See {@link BlockReader#readFully(byte[], int, int)} */
public static void readFully(BlockReader reader,
byte[] buf, int off, int len) throws IOException {
int toRead = len;
while (toRead > 0) {
int ret = reader.read(buf, off, toRead);
if (ret < 0) {
throw new IOException("Premature EOF from inputStream");
}
toRead -= ret;
off += ret;
}
}
}

View File

@ -85,7 +85,7 @@ public class RemoteBlockReader2 implements BlockReader {
Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read. Socket dnSock; //for now just sending the status code (e.g. checksumOk) after the read.
private ReadableByteChannel in; private ReadableByteChannel in;
protected DataChecksum checksum; private DataChecksum checksum;
private PacketHeader curHeader; private PacketHeader curHeader;
private ByteBuffer curPacketBuf = null; private ByteBuffer curPacketBuf = null;
@ -96,25 +96,24 @@ public class RemoteBlockReader2 implements BlockReader {
private long lastSeqNo = -1; private long lastSeqNo = -1;
/** offset in block where reader wants to actually read */ /** offset in block where reader wants to actually read */
protected long startOffset; private long startOffset;
protected final String filename; private final String filename;
protected static DirectBufferPool bufferPool = private static DirectBufferPool bufferPool = new DirectBufferPool();
new DirectBufferPool();
private ByteBuffer headerBuf = ByteBuffer.allocate( private ByteBuffer headerBuf = ByteBuffer.allocate(
PacketHeader.PKT_HEADER_LEN); PacketHeader.PKT_HEADER_LEN);
protected int bytesPerChecksum; private int bytesPerChecksum;
protected int checksumSize; private int checksumSize;
/** /**
* The total number of bytes we need to transfer from the DN. * The total number of bytes we need to transfer from the DN.
* This is the amount that the user has requested plus some padding * This is the amount that the user has requested plus some padding
* at the beginning so that the read can begin on a chunk boundary. * at the beginning so that the read can begin on a chunk boundary.
*/ */
protected long bytesNeededToFinish; private long bytesNeededToFinish;
protected final boolean verifyChecksum; private final boolean verifyChecksum;
private boolean sentStatusCode = false; private boolean sentStatusCode = false;
@ -389,29 +388,12 @@ public static String getFileName(final InetSocketAddress s,
@Override @Override
public int readAll(byte[] buf, int offset, int len) throws IOException { public int readAll(byte[] buf, int offset, int len) throws IOException {
int n = 0; return BlockReaderUtil.readAll(this, buf, offset, len);
for (;;) {
int nread = read(buf, offset + n, len - n);
if (nread <= 0)
return (n == 0) ? nread : n;
n += nread;
if (n >= len)
return n;
}
} }
@Override @Override
public void readFully(byte[] buf, int off, int len) public void readFully(byte[] buf, int off, int len) throws IOException {
throws IOException { BlockReaderUtil.readFully(this, buf, off, len);
int toRead = len;
while (toRead > 0) {
int ret = read(buf, off, toRead);
if (ret < 0) {
throw new IOException("Premature EOF from inputStream");
}
toRead -= ret;
off += ret;
}
} }
/** /**