HDFS-2130. Switch default checksum to CRC32C. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1196888 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-11-03 00:35:23 +00:00
parent c21071f5b7
commit 854c1e8794
13 changed files with 357 additions and 63 deletions

View File

@ -789,6 +789,8 @@ Release 0.23.0 - 2011-11-01
HDFS-2465. Add HDFS support for fadvise readahead and drop-behind. (todd)
HDFS-2130. Switch default checksum to CRC32C. (todd)
BUG FIXES
HDFS-2344. Fix the TestOfflineEditsViewer test failure in 0.23 branch.

View File

@ -97,6 +97,7 @@ import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenRenewer;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
/********************************************************
@ -139,6 +140,7 @@ public class DFSClient implements java.io.Closeable {
final int maxBlockAcquireFailures;
final int confTime;
final int ioBufferSize;
final int checksumType;
final int bytesPerChecksum;
final int writePacketSize;
final int socketTimeout;
@ -163,6 +165,7 @@ public class DFSClient implements java.io.Closeable {
ioBufferSize = conf.getInt(
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY,
CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT);
checksumType = getChecksumType(conf);
bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY,
DFS_BYTES_PER_CHECKSUM_DEFAULT);
socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY,
@ -190,6 +193,26 @@ public class DFSClient implements java.io.Closeable {
DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT);
uMask = FsPermission.getUMask(conf);
}
private int getChecksumType(Configuration conf) {
String checksum = conf.get(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY,
DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT);
if ("CRC32".equals(checksum)) {
return DataChecksum.CHECKSUM_CRC32;
} else if ("CRC32C".equals(checksum)) {
return DataChecksum.CHECKSUM_CRC32C;
} else if ("NULL".equals(checksum)) {
return DataChecksum.CHECKSUM_NULL;
} else {
LOG.warn("Bad checksum type: " + checksum + ". Using default.");
return DataChecksum.CHECKSUM_CRC32C;
}
}
private DataChecksum createChecksum() {
return DataChecksum.newDataChecksum(
checksumType, bytesPerChecksum);
}
}
Conf getConf() {
@ -755,7 +778,7 @@ public class DFSClient implements java.io.Closeable {
}
final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag,
createParent, replication, blockSize, progress, buffersize,
dfsClientConf.bytesPerChecksum);
dfsClientConf.createChecksum());
leaserenewer.put(src, result, this);
return result;
}
@ -799,9 +822,12 @@ public class DFSClient implements java.io.Closeable {
CreateFlag.validate(flag);
DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress);
if (result == null) {
DataChecksum checksum = DataChecksum.newDataChecksum(
dfsClientConf.checksumType,
bytesPerChecksum);
result = new DFSOutputStream(this, src, absPermission,
flag, createParent, replication, blockSize, progress, buffersize,
bytesPerChecksum);
checksum);
}
leaserenewer.put(src, result, this);
return result;
@ -859,7 +885,7 @@ public class DFSClient implements java.io.Closeable {
UnresolvedPathException.class);
}
return new DFSOutputStream(this, src, buffersize, progress,
lastBlock, stat, dfsClientConf.bytesPerChecksum);
lastBlock, stat, dfsClientConf.createChecksum());
}
/**

View File

@ -38,6 +38,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096;
public static final String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum";
public static final int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512;
public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type";
public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C";
public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size";
public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024;
public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable";

View File

@ -74,7 +74,6 @@ import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.PureJavaCrc32;
/****************************************************************
@ -1206,8 +1205,9 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
}
private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progressable progress,
int bytesPerChecksum, short replication) throws IOException {
super(new PureJavaCrc32(), bytesPerChecksum, 4);
DataChecksum checksum, short replication) throws IOException {
super(checksum, checksum.getBytesPerChecksum(), checksum.getChecksumSize());
int bytesPerChecksum = checksum.getBytesPerChecksum();
this.dfsClient = dfsClient;
this.src = src;
this.blockSize = blockSize;
@ -1225,8 +1225,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
"multiple of io.bytes.per.checksum");
}
checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32,
bytesPerChecksum);
this.checksum = checksum;
}
/**
@ -1235,11 +1234,12 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
*/
DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked, EnumSet<CreateFlag> flag,
boolean createParent, short replication, long blockSize, Progressable progress,
int buffersize, int bytesPerChecksum)
int buffersize, DataChecksum checksum)
throws IOException {
this(dfsClient, src, blockSize, progress, bytesPerChecksum, replication);
this(dfsClient, src, blockSize, progress, checksum, replication);
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
try {
dfsClient.namenode.create(
@ -1264,8 +1264,8 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
*/
DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress,
LocatedBlock lastBlock, HdfsFileStatus stat,
int bytesPerChecksum) throws IOException {
this(dfsClient, src, stat.getBlockSize(), progress, bytesPerChecksum, stat.getReplication());
DataChecksum checksum) throws IOException {
this(dfsClient, src, stat.getBlockSize(), progress, checksum, stat.getReplication());
initialFileSize = stat.getLen(); // length of file when opened
//
@ -1274,9 +1274,10 @@ class DFSOutputStream extends FSOutputSummer implements Syncable {
if (lastBlock != null) {
// indicate that we are appending to an existing block
bytesCurBlock = lastBlock.getBlockSize();
streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum);
streamer = new DataStreamer(lastBlock, stat, checksum.getBytesPerChecksum());
} else {
computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum);
computePacketChunkSize(dfsClient.getConf().writePacketSize,
checksum.getBytesPerChecksum());
streamer = new DataStreamer();
}
streamer.start();

View File

@ -18,12 +18,15 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.BufferedInputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import org.apache.commons.httpclient.methods.GetMethod;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
@ -88,6 +91,18 @@ class BlockMetadataHeader {
}
}
/**
* Read the header at the beginning of the given block meta file.
* The current file position will be altered by this method.
* If an error occurs, the file is <em>not</em> closed.
*/
static BlockMetadataHeader readHeader(RandomAccessFile raf) throws IOException {
byte[] buf = new byte[getHeaderSize()];
raf.seek(0);
raf.readFully(buf, 0, buf.length);
return readHeader(new DataInputStream(new ByteArrayInputStream(buf)));
}
// Version is already read.
private static BlockMetadataHeader readHeader(short version, DataInputStream in)
throws IOException {

View File

@ -63,7 +63,15 @@ class BlockReceiver implements Closeable {
private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024;
private DataInputStream in = null; // from where data are read
private DataChecksum checksum; // from where chunks of a block can be read
private DataChecksum clientChecksum; // checksum used by client
private DataChecksum diskChecksum; // checksum we write to disk
/**
* In the case that the client is writing with a different
* checksum polynomial than the block is stored with on disk,
* the DataNode needs to recalculate checksums before writing.
*/
private boolean needsChecksumTranslation;
private OutputStream out = null; // to block file at local disk
private FileDescriptor outFd;
private OutputStream cout = null; // output stream for cehcksum file
@ -177,33 +185,35 @@ class BlockReceiver implements Closeable {
" while receiving block " + block + " from " + inAddr);
}
}
// read checksum meta information
this.checksum = requestedChecksum;
this.bytesPerChecksum = checksum.getBytesPerChecksum();
this.checksumSize = checksum.getChecksumSize();
this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites();
this.syncBehindWrites = datanode.shouldSyncBehindWrites();
final boolean isCreate = isDatanode || isTransfer
|| stage == BlockConstructionStage.PIPELINE_SETUP_CREATE;
streams = replicaInfo.createStreams(isCreate,
this.bytesPerChecksum, this.checksumSize);
if (streams != null) {
this.out = streams.dataOut;
if (out instanceof FileOutputStream) {
this.outFd = ((FileOutputStream)out).getFD();
} else {
LOG.warn("Could not get file descriptor for outputstream of class " +
out.getClass());
}
this.cout = streams.checksumOut;
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
// write data chunk header if creating a new replica
if (isCreate) {
BlockMetadataHeader.writeHeader(checksumOut, checksum);
}
streams = replicaInfo.createStreams(isCreate, requestedChecksum);
assert streams != null : "null streams!";
// read checksum meta information
this.clientChecksum = requestedChecksum;
this.diskChecksum = streams.getChecksum();
this.needsChecksumTranslation = !clientChecksum.equals(diskChecksum);
this.bytesPerChecksum = diskChecksum.getBytesPerChecksum();
this.checksumSize = diskChecksum.getChecksumSize();
this.out = streams.dataOut;
if (out instanceof FileOutputStream) {
this.outFd = ((FileOutputStream)out).getFD();
} else {
LOG.warn("Could not get file descriptor for outputstream of class " +
out.getClass());
}
this.cout = streams.checksumOut;
this.checksumOut = new DataOutputStream(new BufferedOutputStream(
streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE));
// write data chunk header if creating a new replica
if (isCreate) {
BlockMetadataHeader.writeHeader(checksumOut, diskChecksum);
}
} catch (ReplicaAlreadyExistsException bae) {
throw bae;
} catch (ReplicaNotFoundException bne) {
@ -315,9 +325,9 @@ class BlockReceiver implements Closeable {
while (len > 0) {
int chunkLen = Math.min(len, bytesPerChecksum);
checksum.update(dataBuf, dataOff, chunkLen);
clientChecksum.update(dataBuf, dataOff, chunkLen);
if (!checksum.compare(checksumBuf, checksumOff)) {
if (!clientChecksum.compare(checksumBuf, checksumOff)) {
if (srcDataNode != null) {
try {
LOG.info("report corrupt block " + block + " from datanode " +
@ -334,12 +344,32 @@ class BlockReceiver implements Closeable {
"while writing " + block + " from " + inAddr);
}
checksum.reset();
clientChecksum.reset();
dataOff += chunkLen;
checksumOff += checksumSize;
len -= chunkLen;
}
}
/**
* Translate CRC chunks from the client's checksum implementation
* to the disk checksum implementation.
*
* This does not verify the original checksums, under the assumption
* that they have already been validated.
*/
private void translateChunks( byte[] dataBuf, int dataOff, int len,
byte[] checksumBuf, int checksumOff )
throws IOException {
if (len == 0) return;
int numChunks = (len - 1)/bytesPerChecksum + 1;
diskChecksum.calculateChunkedSums(
ByteBuffer.wrap(dataBuf, dataOff, len),
ByteBuffer.wrap(checksumBuf, checksumOff, numChunks * checksumSize));
}
/**
* Makes sure buf.position() is zero without modifying buf.remaining().
@ -583,9 +613,16 @@ class BlockReceiver implements Closeable {
* protocol includes acks and only the last datanode needs to verify
* checksum.
*/
if (mirrorOut == null || isDatanode) {
if (mirrorOut == null || isDatanode || needsChecksumTranslation) {
verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
if (needsChecksumTranslation) {
// overwrite the checksums in the packet buffer with the
// appropriate polynomial for the disk storage.
translateChunks(pktBuf, dataOff, len, pktBuf, checksumOff);
}
}
// by this point, the data in the buffer uses the disk checksum
byte[] lastChunkChecksum;
@ -807,7 +844,7 @@ class BlockReceiver implements Closeable {
// find offset of the beginning of partial chunk.
//
int sizePartialChunk = (int) (blkoff % bytesPerChecksum);
int checksumSize = checksum.getChecksumSize();
int checksumSize = diskChecksum.getChecksumSize();
blkoff = blkoff - sizePartialChunk;
LOG.info("computePartialChunkCrc sizePartialChunk " +
sizePartialChunk +
@ -832,7 +869,8 @@ class BlockReceiver implements Closeable {
}
// compute crc of partial chunk from data read in the block file.
partialCrc = new PureJavaCrc32();
partialCrc = DataChecksum.newDataChecksum(
diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum());
partialCrc.update(buf, 0, sizePartialChunk);
LOG.info("Read in partial CRC chunk from disk for block " + block);

View File

@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
/**
@ -158,15 +159,23 @@ public interface FSDatasetInterface extends FSDatasetMBean {
static class BlockWriteStreams {
OutputStream dataOut;
OutputStream checksumOut;
BlockWriteStreams(OutputStream dOut, OutputStream cOut) {
DataChecksum checksum;
BlockWriteStreams(OutputStream dOut, OutputStream cOut,
DataChecksum checksum) {
dataOut = dOut;
checksumOut = cOut;
this.checksum = checksum;
}
void close() throws IOException {
IOUtils.closeStream(dataOut);
IOUtils.closeStream(checksumOut);
}
DataChecksum getChecksum() {
return checksum;
}
}
/**

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import java.io.DataInputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
@ -169,7 +170,7 @@ class ReplicaInPipeline extends ReplicaInfo
@Override // ReplicaInPipelineInterface
public BlockWriteStreams createStreams(boolean isCreate,
int bytesPerChunk, int checksumSize) throws IOException {
DataChecksum requestedChecksum) throws IOException {
File blockFile = getBlockFile();
File metaFile = getMetaFile();
if (DataNode.LOG.isDebugEnabled()) {
@ -180,30 +181,64 @@ class ReplicaInPipeline extends ReplicaInfo
}
long blockDiskSize = 0L;
long crcDiskSize = 0L;
if (!isCreate) { // check on disk file
blockDiskSize = bytesOnDisk;
crcDiskSize = BlockMetadataHeader.getHeaderSize() +
(blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
if (blockDiskSize>0 &&
(blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) {
throw new IOException("Corrupted block: " + this);
// the checksum that should actually be used -- this
// may differ from requestedChecksum for appends.
DataChecksum checksum;
RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw");
if (!isCreate) {
// For append or recovery, we must enforce the existing checksum.
// Also, verify that the file has correct lengths, etc.
boolean checkedMeta = false;
try {
BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF);
checksum = header.getChecksum();
if (checksum.getBytesPerChecksum() !=
requestedChecksum.getBytesPerChecksum()) {
throw new IOException("Client requested checksum " +
requestedChecksum + " when appending to an existing block " +
"with different chunk size: " + checksum);
}
int bytesPerChunk = checksum.getBytesPerChecksum();
int checksumSize = checksum.getChecksumSize();
blockDiskSize = bytesOnDisk;
crcDiskSize = BlockMetadataHeader.getHeaderSize() +
(blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize;
if (blockDiskSize>0 &&
(blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) {
throw new IOException("Corrupted block: " + this);
}
checkedMeta = true;
} finally {
if (!checkedMeta) {
// clean up in case of exceptions.
IOUtils.closeStream(metaRAF);
}
}
} else {
// for create, we can use the requested checksum
checksum = requestedChecksum;
}
FileOutputStream blockOut = null;
FileOutputStream crcOut = null;
try {
blockOut = new FileOutputStream(
new RandomAccessFile( blockFile, "rw" ).getFD() );
crcOut = new FileOutputStream(
new RandomAccessFile( metaFile, "rw" ).getFD() );
crcOut = new FileOutputStream(metaRAF.getFD() );
if (!isCreate) {
blockOut.getChannel().position(blockDiskSize);
crcOut.getChannel().position(crcDiskSize);
}
return new BlockWriteStreams(blockOut, crcOut);
return new BlockWriteStreams(blockOut, crcOut, checksum);
} catch (IOException e) {
IOUtils.closeStream(blockOut);
IOUtils.closeStream(crcOut);
IOUtils.closeStream(metaRAF);
throw e;
}
}

View File

@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.IOException;
import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams;
import org.apache.hadoop.util.DataChecksum;
/**
* This defines the interface of a replica in Pipeline that's being written to
@ -61,11 +62,10 @@ interface ReplicaInPipelineInterface extends Replica {
* one for block file and one for CRC file
*
* @param isCreate if it is for creation
* @param bytePerChunk number of bytes per CRC chunk
* @param checksumSize number of bytes per checksum
* @param requestedChecksum the checksum the writer would prefer to use
* @return output streams for writing
* @throws IOException if any error occurs
*/
public BlockWriteStreams createStreams(boolean isCreate,
int bytesPerChunk, int checksumSize) throws IOException;
DataChecksum requestedChecksum) throws IOException;
}

View File

@ -0,0 +1,165 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs;
import java.io.IOException;
import java.util.Random;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
/**
* Test cases for trying to append to a file with a different
* checksum than the file was originally written with.
*/
public class TestAppendDifferentChecksum {
private static final int SEGMENT_LENGTH = 1500;
// run the randomized test for 5 seconds
private static final long RANDOM_TEST_RUNTIME = 5000;
private static MiniDFSCluster cluster;
private static FileSystem fs;
@BeforeClass
public static void setupCluster() throws IOException {
Configuration conf = new HdfsConfiguration();
conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096);
// disable block scanner, since otherwise this test can trigger
// HDFS-2525, which is a different bug than we're trying to unit test
// here! When HDFS-2525 is fixed, this can be removed.
conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1);
conf.set("fs.hdfs.impl.disable.cache", "true");
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(1)
.build();
fs = cluster.getFileSystem();
}
@AfterClass
public static void teardown() throws IOException {
if (cluster != null) {
cluster.shutdown();
}
}
/**
* This test does not run, since switching chunksize with append
* is not implemented. Please see HDFS-2130 for a discussion of the
* difficulties in doing so.
*/
@Test
@Ignore("this is not implemented! See HDFS-2130")
public void testSwitchChunkSize() throws IOException {
FileSystem fsWithSmallChunk = createFsWithChecksum("CRC32", 512);
FileSystem fsWithBigChunk = createFsWithChecksum("CRC32", 1024);
Path p = new Path("/testSwitchChunkSize");
appendWithTwoFs(p, fsWithSmallChunk, fsWithBigChunk);
AppendTestUtil.check(fsWithSmallChunk, p, SEGMENT_LENGTH * 2);
AppendTestUtil.check(fsWithBigChunk, p, SEGMENT_LENGTH * 2);
}
/**
* Simple unit test which writes some data with one algorithm,
* then appends with another.
*/
@Test
public void testSwitchAlgorithms() throws IOException {
FileSystem fsWithCrc32 = createFsWithChecksum("CRC32", 512);
FileSystem fsWithCrc32C = createFsWithChecksum("CRC32C", 512);
Path p = new Path("/testSwitchAlgorithms");
appendWithTwoFs(p, fsWithCrc32, fsWithCrc32C);
// Regardless of which FS is used to read, it should pick up
// the on-disk checksum!
AppendTestUtil.check(fsWithCrc32C, p, SEGMENT_LENGTH * 2);
AppendTestUtil.check(fsWithCrc32, p, SEGMENT_LENGTH * 2);
}
/**
* Test which randomly alternates between appending with
* CRC32 and with CRC32C, crossing several block boundaries.
* Then, checks that all of the data can be read back correct.
*/
@Test(timeout=RANDOM_TEST_RUNTIME*2)
public void testAlgoSwitchRandomized() throws IOException {
FileSystem fsWithCrc32 = createFsWithChecksum("CRC32", 512);
FileSystem fsWithCrc32C = createFsWithChecksum("CRC32C", 512);
Path p = new Path("/testAlgoSwitchRandomized");
long seed = System.currentTimeMillis();
System.out.println("seed: " + seed);
Random r = new Random(seed);
// Create empty to start
IOUtils.closeStream(fsWithCrc32.create(p));
long st = System.currentTimeMillis();
int len = 0;
while (System.currentTimeMillis() - st < RANDOM_TEST_RUNTIME) {
int thisLen = r.nextInt(500);
FileSystem fs = (r.nextBoolean() ? fsWithCrc32 : fsWithCrc32C);
FSDataOutputStream stm = fs.append(p);
try {
AppendTestUtil.write(stm, len, thisLen);
} finally {
stm.close();
}
len += thisLen;
}
AppendTestUtil.check(fsWithCrc32, p, len);
AppendTestUtil.check(fsWithCrc32C, p, len);
}
private FileSystem createFsWithChecksum(String type, int bytes)
throws IOException {
Configuration conf = new Configuration(fs.getConf());
conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, type);
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytes);
return FileSystem.get(conf);
}
private void appendWithTwoFs(Path p, FileSystem fs1, FileSystem fs2)
throws IOException {
FSDataOutputStream stm = fs1.create(p);
try {
AppendTestUtil.write(stm, 0, SEGMENT_LENGTH);
} finally {
stm.close();
}
stm = fs2.append(p);
try {
AppendTestUtil.write(stm, SEGMENT_LENGTH, SEGMENT_LENGTH);
} finally {
stm.close();
}
}
}

View File

@ -74,7 +74,7 @@ public class TestDataTransferProtocol extends TestCase {
"org.apache.hadoop.hdfs.TestDataTransferProtocol");
private static final DataChecksum DEFAULT_CHECKSUM =
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512);
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32C, 512);
DatanodeID datanode;
InetSocketAddress dnAddr;

View File

@ -204,13 +204,13 @@ public class SimulatedFSDataset implements FSDatasetInterface, Configurable{
@Override
synchronized public BlockWriteStreams createStreams(boolean isCreate,
int bytesPerChunk, int checksumSize) throws IOException {
DataChecksum requestedChecksum) throws IOException {
if (finalized) {
throw new IOException("Trying to write to a finalized replica "
+ theBlock);
} else {
SimulatedOutputStream crcStream = new SimulatedOutputStream();
return new BlockWriteStreams(oStream, crcStream);
return new BlockWriteStreams(oStream, crcStream, requestedChecksum);
}
}

View File

@ -64,7 +64,8 @@ public class TestSimulatedFSDataset extends TestCase {
// we pass expected len as zero, - fsdataset should use the sizeof actual
// data written
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b);
BlockWriteStreams out = bInfo.createStreams(true, 512, 4);
BlockWriteStreams out = bInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512));
try {
OutputStream dataOut = out.dataOut;
assertEquals(0, fsdataset.getLength(b));