HDFS-8224. Schedule a block for scanning if its metadata file is
corrupt. Contributed by Rushabh S Shah.
This commit is contained in:
parent
30a4c0bb63
commit
cdc48c5d5e
|
@ -122,8 +122,8 @@ public class DataChecksum implements Checksum {
|
|||
int bpc = in.readInt();
|
||||
DataChecksum summer = newDataChecksum(Type.valueOf(type), bpc );
|
||||
if ( summer == null ) {
|
||||
throw new IOException( "Could not create DataChecksum of type " +
|
||||
type + " with bytesPerChecksum " + bpc );
|
||||
throw new InvalidChecksumSizeException("Could not create DataChecksum "
|
||||
+ "of type " + type + " with bytesPerChecksum " + bpc);
|
||||
}
|
||||
return summer;
|
||||
}
|
||||
|
|
|
@ -0,0 +1,32 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import java.io.IOException;
|
||||
/**
|
||||
* Thrown when bytesPerChecksun field in the meta file is less than
|
||||
* or equal to 0 or type is invalid.
|
||||
**/
|
||||
public class InvalidChecksumSizeException extends IOException {
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public InvalidChecksumSizeException(String s) {
|
||||
super(s);
|
||||
}
|
||||
}
|
|
@ -194,6 +194,7 @@ import org.apache.hadoop.util.Daemon;
|
|||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
import org.apache.hadoop.util.GenericOptionsParser;
|
||||
import org.apache.hadoop.util.InvalidChecksumSizeException;
|
||||
import org.apache.hadoop.util.JvmPauseMonitor;
|
||||
import org.apache.hadoop.util.ServicePlugin;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
|
@ -317,7 +318,7 @@ public class DataNode extends ReconfigurableBase
|
|||
BlockPoolTokenSecretManager blockPoolTokenSecretManager;
|
||||
private boolean hasAnyBlockPoolRegistered = false;
|
||||
|
||||
private final BlockScanner blockScanner;
|
||||
private BlockScanner blockScanner;
|
||||
private DirectoryScanner directoryScanner = null;
|
||||
|
||||
/** Activated plug-ins. */
|
||||
|
@ -1864,7 +1865,8 @@ public class DataNode extends ReconfigurableBase
|
|||
LOG.warn(msg);
|
||||
}
|
||||
|
||||
private void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
|
||||
@VisibleForTesting
|
||||
void transferBlock(ExtendedBlock block, DatanodeInfo[] xferTargets,
|
||||
StorageType[] xferTargetStorageTypes) throws IOException {
|
||||
BPOfferService bpos = getBPOSForBlock(block);
|
||||
DatanodeRegistration bpReg = getDNRegistrationForBP(block.getBlockPoolId());
|
||||
|
@ -2149,6 +2151,13 @@ public class DataNode extends ReconfigurableBase
|
|||
}
|
||||
}
|
||||
} catch (IOException ie) {
|
||||
if (ie instanceof InvalidChecksumSizeException) {
|
||||
// Add the block to the front of the scanning queue if metadata file
|
||||
// is corrupt. We already add the block to front of scanner if the
|
||||
// peer disconnects.
|
||||
LOG.info("Adding block: " + b + " for scanning");
|
||||
blockScanner.markSuspectBlock(data.getVolume(b).getStorageID(), b);
|
||||
}
|
||||
LOG.warn(bpReg + ":Failed to transfer " + b + " to " +
|
||||
targets[0] + " got ", ie);
|
||||
// check if there are any disk problem
|
||||
|
@ -3216,4 +3225,9 @@ public class DataNode extends ReconfigurableBase
|
|||
checkSuperuserPrivilege();
|
||||
spanReceiverHost.removeSpanReceiver(id);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void setBlockScanner(BlockScanner blockScanner) {
|
||||
this.blockScanner = blockScanner;
|
||||
}
|
||||
}
|
|
@ -23,10 +23,12 @@ import static org.junit.Assert.assertTrue;
|
|||
import java.io.DataOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.RandomAccessFile;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.Socket;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -37,11 +39,13 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
|||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
|
||||
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
@ -49,6 +53,7 @@ import org.apache.hadoop.util.Time;
|
|||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
/**
|
||||
* Test that datanodes can correctly handle errors during block read/write.
|
||||
|
@ -219,4 +224,53 @@ public class TestDiskError {
|
|||
long lastDiskErrorCheck = dataNode.getLastDiskErrorCheck();
|
||||
assertTrue("Disk Error check is not performed within " + dataNode.checkDiskErrorInterval + " ms", ((Time.monotonicNow()-lastDiskErrorCheck) < (dataNode.checkDiskErrorInterval + slackTime)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDataTransferWhenBytesPerChecksumIsZero() throws IOException {
|
||||
DataNode dn0 = cluster.getDataNodes().get(0);
|
||||
// Make a mock blockScanner class and return false whenever isEnabled is
|
||||
// called on blockScanner
|
||||
BlockScanner mockScanner = Mockito.mock(BlockScanner.class);
|
||||
Mockito.when(mockScanner.isEnabled()).thenReturn(false);
|
||||
dn0.setBlockScanner(mockScanner);
|
||||
Path filePath = new Path("test.dat");
|
||||
FSDataOutputStream out = fs.create(filePath, (short) 1);
|
||||
out.write(1);
|
||||
out.hflush();
|
||||
out.close();
|
||||
// Corrupt the metadata file. Insert all 0's in the type and
|
||||
// bytesPerChecksum files of the metadata header.
|
||||
ExtendedBlock block = DFSTestUtil.getFirstBlock(fs, filePath);
|
||||
File metadataFile = cluster.getBlockMetadataFile(0, block);
|
||||
RandomAccessFile raFile = new RandomAccessFile(metadataFile, "rw");
|
||||
raFile.seek(2);
|
||||
raFile.writeByte(0);
|
||||
raFile.writeInt(0);
|
||||
raFile.close();
|
||||
String datanodeId0 = dn0.getDatanodeUuid();
|
||||
LocatedBlock lb = DFSTestUtil.getAllBlocks(fs, filePath).get(0);
|
||||
String storageId = lb.getStorageIDs()[0];
|
||||
cluster.startDataNodes(conf, 1, true, null, null);
|
||||
DataNode dn1 = null;
|
||||
for (int i = 0; i < cluster.getDataNodes().size(); i++) {
|
||||
if (!cluster.getDataNodes().get(i).equals(datanodeId0)) {
|
||||
dn1 = cluster.getDataNodes().get(i);
|
||||
break;
|
||||
}
|
||||
}
|
||||
DatanodeDescriptor dnd1 =
|
||||
NameNodeAdapter.getDatanode(cluster.getNamesystem(),
|
||||
dn1.getDatanodeId());
|
||||
|
||||
dn0.transferBlock(block, new DatanodeInfo[]{dnd1},
|
||||
new StorageType[]{StorageType.DISK});
|
||||
// Sleep for 1 second so the DataTrasnfer daemon can start transfer.
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch (InterruptedException e) {
|
||||
// Do nothing
|
||||
}
|
||||
Mockito.verify(mockScanner).markSuspectBlock(Mockito.eq(storageId),
|
||||
Mockito.eq(block));
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue