HDFS-8224. Schedule a block for scanning if its metadata file is corrupt. Contributed by Rushabh S Shah.

This commit is contained in:
Wei-Chiu Chuang 2016-08-10 11:34:28 -07:00
parent 2b31f6fb2c
commit 8efd4959f3
4 changed files with 105 additions and 5 deletions

View File

@ -122,8 +122,8 @@ public class DataChecksum implements Checksum {
int bpc = in.readInt();
DataChecksum summer = newDataChecksum(Type.valueOf(type), bpc );
if ( summer == null ) {
throw new IOException( "Could not create DataChecksum of type " +
type + " with bytesPerChecksum " + bpc );
throw new InvalidChecksumSizeException("Could not create DataChecksum "
+ "of type " + type + " with bytesPerChecksum " + bpc);
}
return summer;
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.util;
import java.io.IOException;
/**
* Thrown when bytesPerChecksun field in the meta file is less than
* or equal to 0 or type is invalid.
**/
public class InvalidChecksumSizeException extends IOException {
private static final long serialVersionUID = 1L;
public InvalidChecksumSizeException(String s) {
super(s);
}
}

View File

@ -205,6 +205,7 @@ import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.InvalidChecksumSizeException;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
@ -346,7 +347,7 @@ public class DataNode extends ReconfigurableBase
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
private boolean hasAnyBlockPoolRegistered = false;
private final BlockScanner blockScanner;
private BlockScanner blockScanner;
private DirectoryScanner directoryScanner = null;
/** Activated plug-ins. */
@ -2094,7 +2095,8 @@ public class DataNode extends ReconfigurableBase
LOG.warn(msg);
}
private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
@VisibleForTesting
void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
StorageType[] xferTargetStorageTypes) throws IOException {
BPOfferService bpos = getBPOSForBlock(block);
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
@ -2381,6 +2383,13 @@ public class DataNode extends ReconfigurableBase
metrics.incrBlocksReplicated();
}
} catch (IOException ie) {
if (ie instanceof InvalidChecksumSizeException) {
// Add the block to the front of the scanning queue if metadata file
// is corrupt. We already add the block to front of scanner if the
// peer disconnects.
LOG.info("Adding block: " + b + " for scanning");
blockScanner.markSuspectBlock(data.getVolume(b).getStorageID(), b);
}
LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
targets[0] + " got ", ie);
// check if there are any disk problem
@ -3299,4 +3308,9 @@ public class DataNode extends ReconfigurableBase
ScheduledThreadPoolExecutor getMetricsLoggerTimer() {
return metricsLoggerTimer;
}
@VisibleForTesting
void setBlockScanner(BlockScanner blockScanner) {
this.blockScanner = blockScanner;
}
}

View File

@ -23,10 +23,12 @@ import static org.junit.Assert.assertTrue;
import java.io.DataOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.InetSocketAddress;
import java.net.Socket;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@ -37,11 +39,13 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
@ -50,6 +54,7 @@ import org.apache.hadoop.util.Time;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
/**
* Test that datanodes can correctly handle errors during block read/write.
@ -223,4 +228,53 @@ public class TestDiskError {
long lastDiskErrorCheck = dataNode.getLastDiskErrorCheck();
assertTrue("Disk Error check is not performed within " + dataNode.checkDiskErrorInterval + " ms", ((Time.monotonicNow()-lastDiskErrorCheck) < (dataNode.checkDiskErrorInterval + slackTime)));
}
@Test
public void testDataTransferWhenBytesPerChecksumIsZero() throws IOException {
DataNode dn0 = cluster.getDataNodes().get(0);
// Make a mock blockScanner class and return false whenever isEnabled is
// called on blockScanner
BlockScanner mockScanner = Mockito.mock(BlockScanner.class);
Mockito.when(mockScanner.isEnabled()).thenReturn(false);
dn0.setBlockScanner(mockScanner);
Path filePath = new Path("test.dat");
FSDataOutputStream out = fs.create(filePath, (short) 1);
out.write(1);
out.hflush();
out.close();
// Corrupt the metadata file. Insert all 0's in the type and
// bytesPerChecksum files of the metadata header.
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
File metadataFile = cluster.getBlockMetadataFile(0, block);
RandomAccessFile raFile = new RandomAccessFile(metadataFile, "rw");
raFile.seek(2);
raFile.writeByte(0);
raFile.writeInt(0);
raFile.close();
String datanodeId0 = dn0.getDatanodeUuid();
LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
String storageId = lb.getStorageIDs()[0];
cluster.startDataNodes(conf, 1, true, null, null);
DataNode dn1 = null;
for (int i = 0; i < cluster.getDataNodes().size(); i++) {
if (!cluster.getDataNodes().get(i).equals(datanodeId0)) {
dn1 = cluster.getDataNodes().get(i);
break;
}
}
DatanodeDescriptor dnd1 =
NameNodeAdapter.getDatanode(cluster.getNamesystem(),
dn1.getDatanodeId());
dn0.transferBlock(block, new DatanodeInfo[]{dnd1},
new StorageType[]{StorageType.DISK});
// Sleep for 1 second so the DataTrasnfer daemon can start transfer.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Do nothing
}
Mockito.verify(mockScanner).markSuspectBlock(Mockito.eq(storageId),
Mockito.eq(block));
}
}