diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 115bcc15920..a8dca6665bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -789,6 +789,8 @@ Release 0.23.0 - 2011-11-01 HDFS-2465. Add HDFS support for fadvise readahead and drop-behind. (todd) + HDFS-2130. Switch default checksum to CRC32C. (todd) + BUG FIXES HDFS-2344. Fix the TestOfflineEditsViewer test failure in 0.23 branch. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java index 973c2182ea7..f55ae4c0864 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSClient.java @@ -97,6 +97,7 @@ import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenRenewer; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; /******************************************************** @@ -139,6 +140,7 @@ public class DFSClient implements java.io.Closeable { final int maxBlockAcquireFailures; final int confTime; final int ioBufferSize; + final int checksumType; final int bytesPerChecksum; final int writePacketSize; final int socketTimeout; @@ -163,6 +165,7 @@ public class DFSClient implements java.io.Closeable { ioBufferSize = conf.getInt( CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY, CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT); + checksumType = getChecksumType(conf); bytesPerChecksum = conf.getInt(DFS_BYTES_PER_CHECKSUM_KEY, DFS_BYTES_PER_CHECKSUM_DEFAULT); socketTimeout = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, @@ -190,6 +193,26 @@ public class DFSClient implements java.io.Closeable { DFS_CLIENT_BLOCK_WRITE_LOCATEFOLLOWINGBLOCK_RETRIES_DEFAULT); uMask = FsPermission.getUMask(conf); } + + private int getChecksumType(Configuration conf) { + String checksum = conf.get(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, + DFSConfigKeys.DFS_CHECKSUM_TYPE_DEFAULT); + if ("CRC32".equals(checksum)) { + return DataChecksum.CHECKSUM_CRC32; + } else if ("CRC32C".equals(checksum)) { + return DataChecksum.CHECKSUM_CRC32C; + } else if ("NULL".equals(checksum)) { + return DataChecksum.CHECKSUM_NULL; + } else { + LOG.warn("Bad checksum type: " + checksum + ". Using default."); + return DataChecksum.CHECKSUM_CRC32C; + } + } + + private DataChecksum createChecksum() { + return DataChecksum.newDataChecksum( + checksumType, bytesPerChecksum); + } } Conf getConf() { @@ -755,7 +778,7 @@ public class DFSClient implements java.io.Closeable { } final DFSOutputStream result = new DFSOutputStream(this, src, masked, flag, createParent, replication, blockSize, progress, buffersize, - dfsClientConf.bytesPerChecksum); + dfsClientConf.createChecksum()); leaserenewer.put(src, result, this); return result; } @@ -799,9 +822,12 @@ public class DFSClient implements java.io.Closeable { CreateFlag.validate(flag); DFSOutputStream result = primitiveAppend(src, flag, buffersize, progress); if (result == null) { + DataChecksum checksum = DataChecksum.newDataChecksum( + dfsClientConf.checksumType, + bytesPerChecksum); result = new DFSOutputStream(this, src, absPermission, flag, createParent, replication, blockSize, progress, buffersize, - bytesPerChecksum); + checksum); } leaserenewer.put(src, result, this); return result; @@ -859,7 +885,7 @@ public class DFSClient implements java.io.Closeable { UnresolvedPathException.class); } return new DFSOutputStream(this, src, buffersize, progress, - lastBlock, stat, dfsClientConf.bytesPerChecksum); + lastBlock, stat, dfsClientConf.createChecksum()); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index dd39676e750..461cc3b178d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -38,6 +38,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_STREAM_BUFFER_SIZE_DEFAULT = 4096; public static final String DFS_BYTES_PER_CHECKSUM_KEY = "dfs.bytes-per-checksum"; public static final int DFS_BYTES_PER_CHECKSUM_DEFAULT = 512; + public static final String DFS_CHECKSUM_TYPE_KEY = "dfs.checksum.type"; + public static final String DFS_CHECKSUM_TYPE_DEFAULT = "CRC32C"; public static final String DFS_CLIENT_WRITE_PACKET_SIZE_KEY = "dfs.client-write-packet-size"; public static final int DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT = 64*1024; public static final String DFS_CLIENT_WRITE_REPLACE_DATANODE_ON_FAILURE_ENABLE_KEY = "dfs.client.block.write.replace-datanode-on-failure.enable"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java index 174da6841d7..8422ecca007 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSOutputStream.java @@ -74,7 +74,6 @@ import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.Progressable; -import org.apache.hadoop.util.PureJavaCrc32; /**************************************************************** @@ -1206,8 +1205,9 @@ class DFSOutputStream extends FSOutputSummer implements Syncable { } private DFSOutputStream(DFSClient dfsClient, String src, long blockSize, Progressable progress, - int bytesPerChecksum, short replication) throws IOException { - super(new PureJavaCrc32(), bytesPerChecksum, 4); + DataChecksum checksum, short replication) throws IOException { + super(checksum, checksum.getBytesPerChecksum(), checksum.getChecksumSize()); + int bytesPerChecksum = checksum.getBytesPerChecksum(); this.dfsClient = dfsClient; this.src = src; this.blockSize = blockSize; @@ -1225,8 +1225,7 @@ class DFSOutputStream extends FSOutputSummer implements Syncable { "multiple of io.bytes.per.checksum"); } - checksum = DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, - bytesPerChecksum); + this.checksum = checksum; } /** @@ -1235,11 +1234,12 @@ class DFSOutputStream extends FSOutputSummer implements Syncable { */ DFSOutputStream(DFSClient dfsClient, String src, FsPermission masked, EnumSet flag, boolean createParent, short replication, long blockSize, Progressable progress, - int buffersize, int bytesPerChecksum) + int buffersize, DataChecksum checksum) throws IOException { - this(dfsClient, src, blockSize, progress, bytesPerChecksum, replication); + this(dfsClient, src, blockSize, progress, checksum, replication); - computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); + computePacketChunkSize(dfsClient.getConf().writePacketSize, + checksum.getBytesPerChecksum()); try { dfsClient.namenode.create( @@ -1264,8 +1264,8 @@ class DFSOutputStream extends FSOutputSummer implements Syncable { */ DFSOutputStream(DFSClient dfsClient, String src, int buffersize, Progressable progress, LocatedBlock lastBlock, HdfsFileStatus stat, - int bytesPerChecksum) throws IOException { - this(dfsClient, src, stat.getBlockSize(), progress, bytesPerChecksum, stat.getReplication()); + DataChecksum checksum) throws IOException { + this(dfsClient, src, stat.getBlockSize(), progress, checksum, stat.getReplication()); initialFileSize = stat.getLen(); // length of file when opened // @@ -1274,9 +1274,10 @@ class DFSOutputStream extends FSOutputSummer implements Syncable { if (lastBlock != null) { // indicate that we are appending to an existing block bytesCurBlock = lastBlock.getBlockSize(); - streamer = new DataStreamer(lastBlock, stat, bytesPerChecksum); + streamer = new DataStreamer(lastBlock, stat, checksum.getBytesPerChecksum()); } else { - computePacketChunkSize(dfsClient.getConf().writePacketSize, bytesPerChecksum); + computePacketChunkSize(dfsClient.getConf().writePacketSize, + checksum.getBytesPerChecksum()); streamer = new DataStreamer(); } streamer.start(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java index 3254e982cdf..ad5c6d878a2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockMetadataHeader.java @@ -18,12 +18,15 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.BufferedInputStream; +import java.io.ByteArrayInputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.io.RandomAccessFile; +import org.apache.commons.httpclient.methods.GetMethod; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.DataChecksum; @@ -88,6 +91,18 @@ class BlockMetadataHeader { } } + /** + * Read the header at the beginning of the given block meta file. + * The current file position will be altered by this method. + * If an error occurs, the file is not closed. + */ + static BlockMetadataHeader readHeader(RandomAccessFile raf) throws IOException { + byte[] buf = new byte[getHeaderSize()]; + raf.seek(0); + raf.readFully(buf, 0, buf.length); + return readHeader(new DataInputStream(new ByteArrayInputStream(buf))); + } + // Version is already read. private static BlockMetadataHeader readHeader(short version, DataInputStream in) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 94920fd5bc3..9277956e1f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -63,7 +63,15 @@ class BlockReceiver implements Closeable { private static final long CACHE_DROP_LAG_BYTES = 8 * 1024 * 1024; private DataInputStream in = null; // from where data are read - private DataChecksum checksum; // from where chunks of a block can be read + private DataChecksum clientChecksum; // checksum used by client + private DataChecksum diskChecksum; // checksum we write to disk + + /** + * In the case that the client is writing with a different + * checksum polynomial than the block is stored with on disk, + * the DataNode needs to recalculate checksums before writing. + */ + private boolean needsChecksumTranslation; private OutputStream out = null; // to block file at local disk private FileDescriptor outFd; private OutputStream cout = null; // output stream for cehcksum file @@ -177,33 +185,35 @@ class BlockReceiver implements Closeable { " while receiving block " + block + " from " + inAddr); } } - // read checksum meta information - this.checksum = requestedChecksum; - this.bytesPerChecksum = checksum.getBytesPerChecksum(); - this.checksumSize = checksum.getChecksumSize(); this.dropCacheBehindWrites = datanode.shouldDropCacheBehindWrites(); this.syncBehindWrites = datanode.shouldSyncBehindWrites(); final boolean isCreate = isDatanode || isTransfer || stage == BlockConstructionStage.PIPELINE_SETUP_CREATE; - streams = replicaInfo.createStreams(isCreate, - this.bytesPerChecksum, this.checksumSize); - if (streams != null) { - this.out = streams.dataOut; - if (out instanceof FileOutputStream) { - this.outFd = ((FileOutputStream)out).getFD(); - } else { - LOG.warn("Could not get file descriptor for outputstream of class " + - out.getClass()); - } - this.cout = streams.checksumOut; - this.checksumOut = new DataOutputStream(new BufferedOutputStream( - streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE)); - // write data chunk header if creating a new replica - if (isCreate) { - BlockMetadataHeader.writeHeader(checksumOut, checksum); - } + streams = replicaInfo.createStreams(isCreate, requestedChecksum); + assert streams != null : "null streams!"; + + // read checksum meta information + this.clientChecksum = requestedChecksum; + this.diskChecksum = streams.getChecksum(); + this.needsChecksumTranslation = !clientChecksum.equals(diskChecksum); + this.bytesPerChecksum = diskChecksum.getBytesPerChecksum(); + this.checksumSize = diskChecksum.getChecksumSize(); + + this.out = streams.dataOut; + if (out instanceof FileOutputStream) { + this.outFd = ((FileOutputStream)out).getFD(); + } else { + LOG.warn("Could not get file descriptor for outputstream of class " + + out.getClass()); } + this.cout = streams.checksumOut; + this.checksumOut = new DataOutputStream(new BufferedOutputStream( + streams.checksumOut, HdfsConstants.SMALL_BUFFER_SIZE)); + // write data chunk header if creating a new replica + if (isCreate) { + BlockMetadataHeader.writeHeader(checksumOut, diskChecksum); + } } catch (ReplicaAlreadyExistsException bae) { throw bae; } catch (ReplicaNotFoundException bne) { @@ -315,9 +325,9 @@ class BlockReceiver implements Closeable { while (len > 0) { int chunkLen = Math.min(len, bytesPerChecksum); - checksum.update(dataBuf, dataOff, chunkLen); + clientChecksum.update(dataBuf, dataOff, chunkLen); - if (!checksum.compare(checksumBuf, checksumOff)) { + if (!clientChecksum.compare(checksumBuf, checksumOff)) { if (srcDataNode != null) { try { LOG.info("report corrupt block " + block + " from datanode " + @@ -334,12 +344,32 @@ class BlockReceiver implements Closeable { "while writing " + block + " from " + inAddr); } - checksum.reset(); + clientChecksum.reset(); dataOff += chunkLen; checksumOff += checksumSize; len -= chunkLen; } } + + + /** + * Translate CRC chunks from the client's checksum implementation + * to the disk checksum implementation. + * + * This does not verify the original checksums, under the assumption + * that they have already been validated. + */ + private void translateChunks( byte[] dataBuf, int dataOff, int len, + byte[] checksumBuf, int checksumOff ) + throws IOException { + if (len == 0) return; + + int numChunks = (len - 1)/bytesPerChecksum + 1; + + diskChecksum.calculateChunkedSums( + ByteBuffer.wrap(dataBuf, dataOff, len), + ByteBuffer.wrap(checksumBuf, checksumOff, numChunks * checksumSize)); + } /** * Makes sure buf.position() is zero without modifying buf.remaining(). @@ -583,9 +613,16 @@ class BlockReceiver implements Closeable { * protocol includes acks and only the last datanode needs to verify * checksum. */ - if (mirrorOut == null || isDatanode) { + if (mirrorOut == null || isDatanode || needsChecksumTranslation) { verifyChunks(pktBuf, dataOff, len, pktBuf, checksumOff); + if (needsChecksumTranslation) { + // overwrite the checksums in the packet buffer with the + // appropriate polynomial for the disk storage. + translateChunks(pktBuf, dataOff, len, pktBuf, checksumOff); + } } + + // by this point, the data in the buffer uses the disk checksum byte[] lastChunkChecksum; @@ -807,7 +844,7 @@ class BlockReceiver implements Closeable { // find offset of the beginning of partial chunk. // int sizePartialChunk = (int) (blkoff % bytesPerChecksum); - int checksumSize = checksum.getChecksumSize(); + int checksumSize = diskChecksum.getChecksumSize(); blkoff = blkoff - sizePartialChunk; LOG.info("computePartialChunkCrc sizePartialChunk " + sizePartialChunk + @@ -832,7 +869,8 @@ class BlockReceiver implements Closeable { } // compute crc of partial chunk from data read in the block file. - partialCrc = new PureJavaCrc32(); + partialCrc = DataChecksum.newDataChecksum( + diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum()); partialCrc.update(buf, 0, sizePartialChunk); LOG.info("Read in partial CRC chunk from disk for block " + block); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java index 867eebe5b52..38017cfdb88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/FSDatasetInterface.java @@ -33,6 +33,7 @@ import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.BlockListAsLongs; import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.util.DataChecksum; import org.apache.hadoop.util.DiskChecker.DiskErrorException; /** @@ -158,15 +159,23 @@ public interface FSDatasetInterface extends FSDatasetMBean { static class BlockWriteStreams { OutputStream dataOut; OutputStream checksumOut; - BlockWriteStreams(OutputStream dOut, OutputStream cOut) { + DataChecksum checksum; + + BlockWriteStreams(OutputStream dOut, OutputStream cOut, + DataChecksum checksum) { dataOut = dOut; checksumOut = cOut; + this.checksum = checksum; } void close() throws IOException { IOUtils.closeStream(dataOut); IOUtils.closeStream(checksumOut); } + + DataChecksum getChecksum() { + return checksum; + } } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java index 447b9337ce1..14c1258fe4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipeline.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import java.io.DataInputStream; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -169,7 +170,7 @@ class ReplicaInPipeline extends ReplicaInfo @Override // ReplicaInPipelineInterface public BlockWriteStreams createStreams(boolean isCreate, - int bytesPerChunk, int checksumSize) throws IOException { + DataChecksum requestedChecksum) throws IOException { File blockFile = getBlockFile(); File metaFile = getMetaFile(); if (DataNode.LOG.isDebugEnabled()) { @@ -180,30 +181,64 @@ class ReplicaInPipeline extends ReplicaInfo } long blockDiskSize = 0L; long crcDiskSize = 0L; - if (!isCreate) { // check on disk file - blockDiskSize = bytesOnDisk; - crcDiskSize = BlockMetadataHeader.getHeaderSize() + - (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize; - if (blockDiskSize>0 && - (blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) { - throw new IOException("Corrupted block: " + this); + + // the checksum that should actually be used -- this + // may differ from requestedChecksum for appends. + DataChecksum checksum; + + RandomAccessFile metaRAF = new RandomAccessFile(metaFile, "rw"); + + if (!isCreate) { + // For append or recovery, we must enforce the existing checksum. + // Also, verify that the file has correct lengths, etc. + boolean checkedMeta = false; + try { + BlockMetadataHeader header = BlockMetadataHeader.readHeader(metaRAF); + checksum = header.getChecksum(); + + if (checksum.getBytesPerChecksum() != + requestedChecksum.getBytesPerChecksum()) { + throw new IOException("Client requested checksum " + + requestedChecksum + " when appending to an existing block " + + "with different chunk size: " + checksum); + } + + int bytesPerChunk = checksum.getBytesPerChecksum(); + int checksumSize = checksum.getChecksumSize(); + + blockDiskSize = bytesOnDisk; + crcDiskSize = BlockMetadataHeader.getHeaderSize() + + (blockDiskSize+bytesPerChunk-1)/bytesPerChunk*checksumSize; + if (blockDiskSize>0 && + (blockDiskSize>blockFile.length() || crcDiskSize>metaFile.length())) { + throw new IOException("Corrupted block: " + this); + } + checkedMeta = true; + } finally { + if (!checkedMeta) { + // clean up in case of exceptions. + IOUtils.closeStream(metaRAF); + } } + } else { + // for create, we can use the requested checksum + checksum = requestedChecksum; } + FileOutputStream blockOut = null; FileOutputStream crcOut = null; try { blockOut = new FileOutputStream( new RandomAccessFile( blockFile, "rw" ).getFD() ); - crcOut = new FileOutputStream( - new RandomAccessFile( metaFile, "rw" ).getFD() ); + crcOut = new FileOutputStream(metaRAF.getFD() ); if (!isCreate) { blockOut.getChannel().position(blockDiskSize); crcOut.getChannel().position(crcDiskSize); } - return new BlockWriteStreams(blockOut, crcOut); + return new BlockWriteStreams(blockOut, crcOut, checksum); } catch (IOException e) { IOUtils.closeStream(blockOut); - IOUtils.closeStream(crcOut); + IOUtils.closeStream(metaRAF); throw e; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java index 1acc76f4379..17eefa98da2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ReplicaInPipelineInterface.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.io.IOException; import org.apache.hadoop.hdfs.server.datanode.FSDatasetInterface.BlockWriteStreams; +import org.apache.hadoop.util.DataChecksum; /** * This defines the interface of a replica in Pipeline that's being written to @@ -61,11 +62,10 @@ interface ReplicaInPipelineInterface extends Replica { * one for block file and one for CRC file * * @param isCreate if it is for creation - * @param bytePerChunk number of bytes per CRC chunk - * @param checksumSize number of bytes per checksum + * @param requestedChecksum the checksum the writer would prefer to use * @return output streams for writing * @throws IOException if any error occurs */ public BlockWriteStreams createStreams(boolean isCreate, - int bytesPerChunk, int checksumSize) throws IOException; + DataChecksum requestedChecksum) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java new file mode 100644 index 00000000000..f296419bde5 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestAppendDifferentChecksum.java @@ -0,0 +1,165 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs; + +import java.io.IOException; +import java.util.Random; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IOUtils; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; + +/** + * Test cases for trying to append to a file with a different + * checksum than the file was originally written with. + */ +public class TestAppendDifferentChecksum { + private static final int SEGMENT_LENGTH = 1500; + + // run the randomized test for 5 seconds + private static final long RANDOM_TEST_RUNTIME = 5000; + private static MiniDFSCluster cluster; + private static FileSystem fs; + + + @BeforeClass + public static void setupCluster() throws IOException { + Configuration conf = new HdfsConfiguration(); + conf.setInt(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, 4096); + + // disable block scanner, since otherwise this test can trigger + // HDFS-2525, which is a different bug than we're trying to unit test + // here! When HDFS-2525 is fixed, this can be removed. + conf.setInt(DFSConfigKeys.DFS_DATANODE_SCAN_PERIOD_HOURS_KEY, -1); + + conf.set("fs.hdfs.impl.disable.cache", "true"); + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(1) + .build(); + fs = cluster.getFileSystem(); + } + + @AfterClass + public static void teardown() throws IOException { + if (cluster != null) { + cluster.shutdown(); + } + } + + /** + * This test does not run, since switching chunksize with append + * is not implemented. Please see HDFS-2130 for a discussion of the + * difficulties in doing so. + */ + @Test + @Ignore("this is not implemented! See HDFS-2130") + public void testSwitchChunkSize() throws IOException { + FileSystem fsWithSmallChunk = createFsWithChecksum("CRC32", 512); + FileSystem fsWithBigChunk = createFsWithChecksum("CRC32", 1024); + Path p = new Path("/testSwitchChunkSize"); + appendWithTwoFs(p, fsWithSmallChunk, fsWithBigChunk); + AppendTestUtil.check(fsWithSmallChunk, p, SEGMENT_LENGTH * 2); + AppendTestUtil.check(fsWithBigChunk, p, SEGMENT_LENGTH * 2); + } + + /** + * Simple unit test which writes some data with one algorithm, + * then appends with another. + */ + @Test + public void testSwitchAlgorithms() throws IOException { + FileSystem fsWithCrc32 = createFsWithChecksum("CRC32", 512); + FileSystem fsWithCrc32C = createFsWithChecksum("CRC32C", 512); + + Path p = new Path("/testSwitchAlgorithms"); + appendWithTwoFs(p, fsWithCrc32, fsWithCrc32C); + // Regardless of which FS is used to read, it should pick up + // the on-disk checksum! + AppendTestUtil.check(fsWithCrc32C, p, SEGMENT_LENGTH * 2); + AppendTestUtil.check(fsWithCrc32, p, SEGMENT_LENGTH * 2); + } + + /** + * Test which randomly alternates between appending with + * CRC32 and with CRC32C, crossing several block boundaries. + * Then, checks that all of the data can be read back correct. + */ + @Test(timeout=RANDOM_TEST_RUNTIME*2) + public void testAlgoSwitchRandomized() throws IOException { + FileSystem fsWithCrc32 = createFsWithChecksum("CRC32", 512); + FileSystem fsWithCrc32C = createFsWithChecksum("CRC32C", 512); + + Path p = new Path("/testAlgoSwitchRandomized"); + long seed = System.currentTimeMillis(); + System.out.println("seed: " + seed); + Random r = new Random(seed); + + // Create empty to start + IOUtils.closeStream(fsWithCrc32.create(p)); + + long st = System.currentTimeMillis(); + int len = 0; + while (System.currentTimeMillis() - st < RANDOM_TEST_RUNTIME) { + int thisLen = r.nextInt(500); + FileSystem fs = (r.nextBoolean() ? fsWithCrc32 : fsWithCrc32C); + FSDataOutputStream stm = fs.append(p); + try { + AppendTestUtil.write(stm, len, thisLen); + } finally { + stm.close(); + } + len += thisLen; + } + + AppendTestUtil.check(fsWithCrc32, p, len); + AppendTestUtil.check(fsWithCrc32C, p, len); + } + + private FileSystem createFsWithChecksum(String type, int bytes) + throws IOException { + Configuration conf = new Configuration(fs.getConf()); + conf.set(DFSConfigKeys.DFS_CHECKSUM_TYPE_KEY, type); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, bytes); + return FileSystem.get(conf); + } + + + private void appendWithTwoFs(Path p, FileSystem fs1, FileSystem fs2) + throws IOException { + FSDataOutputStream stm = fs1.create(p); + try { + AppendTestUtil.write(stm, 0, SEGMENT_LENGTH); + } finally { + stm.close(); + } + + stm = fs2.append(p); + try { + AppendTestUtil.write(stm, SEGMENT_LENGTH, SEGMENT_LENGTH); + } finally { + stm.close(); + } + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java index d02ae1da353..4333f1c0f9a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDataTransferProtocol.java @@ -74,7 +74,7 @@ public class TestDataTransferProtocol extends TestCase { "org.apache.hadoop.hdfs.TestDataTransferProtocol"); private static final DataChecksum DEFAULT_CHECKSUM = - DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512); + DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32C, 512); DatanodeID datanode; InetSocketAddress dnAddr; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java index df8f0898634..ac823222837 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/SimulatedFSDataset.java @@ -204,13 +204,13 @@ public class SimulatedFSDataset implements FSDatasetInterface, Configurable{ @Override synchronized public BlockWriteStreams createStreams(boolean isCreate, - int bytesPerChunk, int checksumSize) throws IOException { + DataChecksum requestedChecksum) throws IOException { if (finalized) { throw new IOException("Trying to write to a finalized replica " + theBlock); } else { SimulatedOutputStream crcStream = new SimulatedOutputStream(); - return new BlockWriteStreams(oStream, crcStream); + return new BlockWriteStreams(oStream, crcStream, requestedChecksum); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java index a1ba8f71a8a..45356d9d196 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestSimulatedFSDataset.java @@ -64,7 +64,8 @@ public class TestSimulatedFSDataset extends TestCase { // we pass expected len as zero, - fsdataset should use the sizeof actual // data written ReplicaInPipelineInterface bInfo = fsdataset.createRbw(b); - BlockWriteStreams out = bInfo.createStreams(true, 512, 4); + BlockWriteStreams out = bInfo.createStreams(true, + DataChecksum.newDataChecksum(DataChecksum.CHECKSUM_CRC32, 512)); try { OutputStream dataOut = out.dataOut; assertEquals(0, fsdataset.getLength(b));