HDFS-3130. Move fsdataset implementation to a package.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1308437 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
a4ccb8f504
commit
bc13dfb142
|
@ -296,6 +296,8 @@ Release 2.0.0 - UNRELEASED
|
|||
|
||||
HDFS-3144. Refactor DatanodeID#getName by use. (eli)
|
||||
|
||||
HDFS-3130. Move fsdataset implementation to a package. (szetszwo)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-3024. Improve performance of stringification in addStoredBlock (todd)
|
||||
|
|
|
@ -215,7 +215,7 @@
|
|||
the OBL warning.
|
||||
-->
|
||||
<Match>
|
||||
<Class name="org.apache.hadoop.hdfs.server.datanode.FSDataset" />
|
||||
<Class name="org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetImpl" />
|
||||
<Method name="getTmpInputStreams" />
|
||||
<Bug pattern="OBL_UNSATISFIED_OBLIGATION" />
|
||||
</Match>
|
||||
|
|
|
@ -86,7 +86,7 @@ public class BlockMetadataHeader {
|
|||
* @return
|
||||
* @throws IOException
|
||||
*/
|
||||
static BlockMetadataHeader readHeader(File file) throws IOException {
|
||||
public static BlockMetadataHeader readHeader(File file) throws IOException {
|
||||
DataInputStream in = null;
|
||||
try {
|
||||
in = new DataInputStream(new BufferedInputStream(
|
||||
|
@ -144,7 +144,7 @@ public class BlockMetadataHeader {
|
|||
/**
|
||||
* Returns the size of the header
|
||||
*/
|
||||
static int getHeaderSize() {
|
||||
public static int getHeaderSize() {
|
||||
return Short.SIZE/Byte.SIZE + DataChecksum.getChecksumHeaderSize();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -540,8 +540,8 @@ public class DataNode extends Configured
|
|||
}
|
||||
}
|
||||
|
||||
// calls specific to BP
|
||||
protected void notifyNamenodeDeletedBlock(ExtendedBlock block) {
|
||||
/** Notify the corresponding namenode to delete the block. */
|
||||
public void notifyNamenodeDeletedBlock(ExtendedBlock block) {
|
||||
BPOfferService bpos = blockPoolManager.get(block.getBlockPoolId());
|
||||
if (bpos != null) {
|
||||
bpos.notifyNamenodeDeletedBlock(block);
|
||||
|
@ -1134,9 +1134,8 @@ public class DataNode extends Configured
|
|||
|
||||
/**
|
||||
* Check if there is a disk failure and if so, handle the error
|
||||
*
|
||||
**/
|
||||
protected void checkDiskError( ) {
|
||||
*/
|
||||
public void checkDiskError() {
|
||||
try {
|
||||
data.checkDataDir();
|
||||
} catch (DiskErrorException de) {
|
||||
|
@ -1691,6 +1690,12 @@ public class DataNode extends Configured
|
|||
return data;
|
||||
}
|
||||
|
||||
/** @return the block scanner. */
|
||||
public DataBlockScanner getBlockScanner() {
|
||||
return blockScanner;
|
||||
}
|
||||
|
||||
|
||||
public static void secureMain(String args[], SecureResources resources) {
|
||||
try {
|
||||
StringUtils.startupShutdownMessage(DataNode.class, args, LOG);
|
||||
|
|
|
@ -30,28 +30,25 @@ import java.util.HashMap;
|
|||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Properties;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.HardLink;
|
||||
import org.apache.hadoop.fs.LocalFileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.fs.HardLink;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.fs.permission.FsPermission;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion;
|
||||
import org.apache.hadoop.hdfs.protocol.LayoutVersion.Feature;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.common.InconsistentFSStateException;
|
||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||
import org.apache.hadoop.hdfs.server.common.StorageInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NodeType;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.Daemon;
|
||||
|
@ -65,7 +62,7 @@ import org.apache.hadoop.util.DiskChecker;
|
|||
@InterfaceAudience.Private
|
||||
public class DataStorage extends Storage {
|
||||
// Constants
|
||||
final static String BLOCK_SUBDIR_PREFIX = "subdir";
|
||||
public final static String BLOCK_SUBDIR_PREFIX = "subdir";
|
||||
final static String BLOCK_FILE_PREFIX = "blk_";
|
||||
final static String COPY_FILE_PREFIX = "dncp_";
|
||||
final static String STORAGE_DIR_DETACHED = "detach";
|
||||
|
@ -98,15 +95,17 @@ public class DataStorage extends Storage {
|
|||
this.storageID = strgID;
|
||||
}
|
||||
|
||||
synchronized String getStorageID() {
|
||||
/** @return storage ID. */
|
||||
public synchronized String getStorageID() {
|
||||
return storageID;
|
||||
}
|
||||
|
||||
synchronized void setStorageID(String newStorageID) {
|
||||
this.storageID = newStorageID;
|
||||
}
|
||||
|
||||
synchronized void createStorageID(int datanodePort) {
|
||||
|
||||
/** Create an ID for this storage. */
|
||||
public synchronized void createStorageID(int datanodePort) {
|
||||
if (storageID != null && !storageID.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -18,21 +18,17 @@
|
|||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
|
||||
/** Provide utility methods for Datanode. */
|
||||
@InterfaceAudience.Private
|
||||
class DatanodeUtil {
|
||||
static final String METADATA_EXTENSION = ".meta";
|
||||
public class DatanodeUtil {
|
||||
public static final String UNLINK_BLOCK_SUFFIX = ".unlinked";
|
||||
|
||||
static final String UNLINK_BLOCK_SUFFIX = ".unlinked";
|
||||
|
||||
private static final String DISK_ERROR = "Possible disk error: ";
|
||||
public static final String DISK_ERROR = "Possible disk error: ";
|
||||
|
||||
/** Get the cause of an I/O exception if caused by a possible disk error
|
||||
* @param ioe an I/O exception
|
||||
|
@ -52,55 +48,34 @@ class DatanodeUtil {
|
|||
* @throws IOException
|
||||
* if the file already exists or if the file cannot be created.
|
||||
*/
|
||||
static File createTmpFile(Block b, File f) throws IOException {
|
||||
public static File createTmpFile(Block b, File f) throws IOException {
|
||||
if (f.exists()) {
|
||||
throw new IOException("Unexpected problem in creating temporary file for "
|
||||
+ b + ". File " + f + " should not be present, but is.");
|
||||
throw new IOException("Failed to create temporary file for " + b
|
||||
+ ". File " + f + " should not be present, but is.");
|
||||
}
|
||||
// Create the zero-length temp file
|
||||
final boolean fileCreated;
|
||||
try {
|
||||
fileCreated = f.createNewFile();
|
||||
} catch (IOException ioe) {
|
||||
throw (IOException)new IOException(DISK_ERROR + f).initCause(ioe);
|
||||
throw new IOException(DISK_ERROR + "Failed to create " + f, ioe);
|
||||
}
|
||||
if (!fileCreated) {
|
||||
throw new IOException("Unexpected problem in creating temporary file for "
|
||||
+ b + ". File " + f + " should be creatable, but is already present.");
|
||||
throw new IOException("Failed to create temporary file for " + b
|
||||
+ ". File " + f + " should be creatable, but is already present.");
|
||||
}
|
||||
return f;
|
||||
}
|
||||
|
||||
static String getMetaFileName(String blockFileName, long genStamp) {
|
||||
return blockFileName + "_" + genStamp + METADATA_EXTENSION;
|
||||
}
|
||||
|
||||
static File getMetaFile(File f, long genStamp) {
|
||||
return new File(getMetaFileName(f.getAbsolutePath(), genStamp));
|
||||
/**
|
||||
* @return the meta name given the block name and generation stamp.
|
||||
*/
|
||||
public static String getMetaName(String blockName, long generationStamp) {
|
||||
return blockName + "_" + generationStamp + Block.METADATA_EXTENSION;
|
||||
}
|
||||
|
||||
/** Find the corresponding meta data file from a given block file */
|
||||
static File findMetaFile(final File blockFile) throws IOException {
|
||||
final String prefix = blockFile.getName() + "_";
|
||||
final File parent = blockFile.getParentFile();
|
||||
File[] matches = parent.listFiles(new FilenameFilter() {
|
||||
public boolean accept(File dir, String name) {
|
||||
return dir.equals(parent)
|
||||
&& name.startsWith(prefix) && name.endsWith(METADATA_EXTENSION);
|
||||
}
|
||||
});
|
||||
|
||||
if (matches == null || matches.length == 0) {
|
||||
throw new IOException("Meta file not found, blockFile=" + blockFile);
|
||||
}
|
||||
else if (matches.length > 1) {
|
||||
throw new IOException("Found more than one meta files: "
|
||||
+ Arrays.asList(matches));
|
||||
}
|
||||
return matches[0];
|
||||
}
|
||||
|
||||
static File getUnlinkTmpFile(File f) {
|
||||
/** @return the unlink file. */
|
||||
public static File getUnlinkTmpFile(File f) {
|
||||
return new File(f.getParentFile(), f.getName()+UNLINK_BLOCK_SUFFIX);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -26,7 +26,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|||
/**
|
||||
* This class describes a replica that has been finalized.
|
||||
*/
|
||||
class FinalizedReplica extends ReplicaInfo {
|
||||
public class FinalizedReplica extends ReplicaInfo {
|
||||
private boolean unlinked; // copy-on-write done for block
|
||||
|
||||
/**
|
||||
|
@ -37,7 +37,7 @@ class FinalizedReplica extends ReplicaInfo {
|
|||
* @param vol volume where replica is located
|
||||
* @param dir directory path where block and meta files are located
|
||||
*/
|
||||
FinalizedReplica(long blockId, long len, long genStamp,
|
||||
public FinalizedReplica(long blockId, long len, long genStamp,
|
||||
FsVolumeSpi vol, File dir) {
|
||||
super(blockId, len, genStamp, vol, dir);
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ class FinalizedReplica extends ReplicaInfo {
|
|||
* @param vol volume where replica is located
|
||||
* @param dir directory path where block and meta files are located
|
||||
*/
|
||||
FinalizedReplica(Block block, FsVolumeSpi vol, File dir) {
|
||||
public FinalizedReplica(Block block, FsVolumeSpi vol, File dir) {
|
||||
super(block, vol, dir);
|
||||
}
|
||||
|
||||
|
@ -56,7 +56,7 @@ class FinalizedReplica extends ReplicaInfo {
|
|||
* Copy constructor.
|
||||
* @param from
|
||||
*/
|
||||
FinalizedReplica(FinalizedReplica from) {
|
||||
public FinalizedReplica(FinalizedReplica from) {
|
||||
super(from);
|
||||
this.unlinked = from.isUnlinked();
|
||||
}
|
||||
|
@ -67,12 +67,12 @@ class FinalizedReplica extends ReplicaInfo {
|
|||
}
|
||||
|
||||
@Override // ReplicaInfo
|
||||
boolean isUnlinked() {
|
||||
public boolean isUnlinked() {
|
||||
return unlinked;
|
||||
}
|
||||
|
||||
@Override // ReplicaInfo
|
||||
void setUnlinked() {
|
||||
public void setUnlinked() {
|
||||
unlinked = true;
|
||||
}
|
||||
|
||||
|
@ -99,6 +99,6 @@ class FinalizedReplica extends ReplicaInfo {
|
|||
@Override
|
||||
public String toString() {
|
||||
return super.toString()
|
||||
+ "\n unlinked=" + unlinked;
|
||||
+ "\n unlinked =" + unlinked;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,7 +25,7 @@ import java.io.IOException;
|
|||
* Exception indicating that the target block already exists
|
||||
* and is not set to be recovered/overwritten.
|
||||
*/
|
||||
class ReplicaAlreadyExistsException extends IOException {
|
||||
public class ReplicaAlreadyExistsException extends IOException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
public ReplicaAlreadyExistsException() {
|
||||
|
|
|
@ -27,7 +27,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|||
* Those are the replicas that
|
||||
* are created in a pipeline initiated by a dfs client.
|
||||
*/
|
||||
class ReplicaBeingWritten extends ReplicaInPipeline {
|
||||
public class ReplicaBeingWritten extends ReplicaInPipeline {
|
||||
/**
|
||||
* Constructor for a zero length replica
|
||||
* @param blockId block id
|
||||
|
@ -35,7 +35,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline {
|
|||
* @param vol volume where replica is located
|
||||
* @param dir directory path where block and meta files are located
|
||||
*/
|
||||
ReplicaBeingWritten(long blockId, long genStamp,
|
||||
public ReplicaBeingWritten(long blockId, long genStamp,
|
||||
FsVolumeSpi vol, File dir) {
|
||||
super( blockId, genStamp, vol, dir);
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline {
|
|||
* @param dir directory path where block and meta files are located
|
||||
* @param writer a thread that is writing to this replica
|
||||
*/
|
||||
ReplicaBeingWritten(Block block,
|
||||
public ReplicaBeingWritten(Block block,
|
||||
FsVolumeSpi vol, File dir, Thread writer) {
|
||||
super( block, vol, dir, writer);
|
||||
}
|
||||
|
@ -61,7 +61,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline {
|
|||
* @param dir directory path where block and meta files are located
|
||||
* @param writer a thread that is writing to this replica
|
||||
*/
|
||||
ReplicaBeingWritten(long blockId, long len, long genStamp,
|
||||
public ReplicaBeingWritten(long blockId, long len, long genStamp,
|
||||
FsVolumeSpi vol, File dir, Thread writer ) {
|
||||
super( blockId, len, genStamp, vol, dir, writer);
|
||||
}
|
||||
|
@ -70,7 +70,7 @@ class ReplicaBeingWritten extends ReplicaInPipeline {
|
|||
* Copy constructor.
|
||||
* @param from
|
||||
*/
|
||||
ReplicaBeingWritten(ReplicaBeingWritten from) {
|
||||
public ReplicaBeingWritten(ReplicaBeingWritten from) {
|
||||
super(from);
|
||||
}
|
||||
|
||||
|
|
|
@ -37,7 +37,7 @@ import org.apache.hadoop.util.DataChecksum;
|
|||
*
|
||||
* The base class implements a temporary replica
|
||||
*/
|
||||
class ReplicaInPipeline extends ReplicaInfo
|
||||
public class ReplicaInPipeline extends ReplicaInfo
|
||||
implements ReplicaInPipelineInterface {
|
||||
private long bytesAcked;
|
||||
private long bytesOnDisk;
|
||||
|
@ -50,9 +50,8 @@ class ReplicaInPipeline extends ReplicaInfo
|
|||
* @param genStamp replica generation stamp
|
||||
* @param vol volume where replica is located
|
||||
* @param dir directory path where block and meta files are located
|
||||
* @param state replica state
|
||||
*/
|
||||
ReplicaInPipeline(long blockId, long genStamp,
|
||||
public ReplicaInPipeline(long blockId, long genStamp,
|
||||
FsVolumeSpi vol, File dir) {
|
||||
this( blockId, 0L, genStamp, vol, dir, Thread.currentThread());
|
||||
}
|
||||
|
@ -91,7 +90,7 @@ class ReplicaInPipeline extends ReplicaInfo
|
|||
* Copy constructor.
|
||||
* @param from
|
||||
*/
|
||||
ReplicaInPipeline(ReplicaInPipeline from) {
|
||||
public ReplicaInPipeline(ReplicaInPipeline from) {
|
||||
super(from);
|
||||
this.bytesAcked = from.getBytesAcked();
|
||||
this.bytesOnDisk = from.getBytesOnDisk();
|
||||
|
@ -151,7 +150,7 @@ class ReplicaInPipeline extends ReplicaInfo
|
|||
* Interrupt the writing thread and wait until it dies
|
||||
* @throws IOException the waiting is interrupted
|
||||
*/
|
||||
void stopWriter() throws IOException {
|
||||
public void stopWriter() throws IOException {
|
||||
if (writer != null && writer != Thread.currentThread() && writer.isAlive()) {
|
||||
writer.interrupt();
|
||||
try {
|
||||
|
|
|
@ -84,20 +84,12 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
|||
ReplicaInfo(ReplicaInfo from) {
|
||||
this(from, from.getVolume(), from.getDir());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get this replica's meta file name
|
||||
* @return this replica's meta file name
|
||||
*/
|
||||
private String getMetaFileName() {
|
||||
return getBlockName() + "_" + getGenerationStamp() + METADATA_EXTENSION;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the full path of this replica's data file
|
||||
* @return the full path of this replica's data file
|
||||
*/
|
||||
File getBlockFile() {
|
||||
public File getBlockFile() {
|
||||
return new File(getDir(), getBlockName());
|
||||
}
|
||||
|
||||
|
@ -105,15 +97,16 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
|||
* Get the full path of this replica's meta file
|
||||
* @return the full path of this replica's meta file
|
||||
*/
|
||||
File getMetaFile() {
|
||||
return new File(getDir(), getMetaFileName());
|
||||
public File getMetaFile() {
|
||||
return new File(getDir(),
|
||||
DatanodeUtil.getMetaName(getBlockName(), getGenerationStamp()));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the volume where this replica is located on disk
|
||||
* @return the volume where this replica is located on disk
|
||||
*/
|
||||
FsVolumeSpi getVolume() {
|
||||
public FsVolumeSpi getVolume() {
|
||||
return volume;
|
||||
}
|
||||
|
||||
|
@ -136,7 +129,7 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
|||
* Set the parent directory where this replica is located
|
||||
* @param dir the parent directory where the replica is located
|
||||
*/
|
||||
void setDir(File dir) {
|
||||
public void setDir(File dir) {
|
||||
this.dir = dir;
|
||||
}
|
||||
|
||||
|
@ -145,14 +138,14 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
|||
* @return true if the replica has already been unlinked
|
||||
* or no need to be detached; false otherwise
|
||||
*/
|
||||
boolean isUnlinked() {
|
||||
public boolean isUnlinked() {
|
||||
return true; // no need to be unlinked
|
||||
}
|
||||
|
||||
/**
|
||||
* set that this replica is unlinked
|
||||
*/
|
||||
void setUnlinked() {
|
||||
public void setUnlinked() {
|
||||
// no need to be unlinked
|
||||
}
|
||||
|
||||
|
@ -201,7 +194,7 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
|||
* false if it is already detached or no need to be detached
|
||||
* @throws IOException if there is any copy error
|
||||
*/
|
||||
boolean unlinkBlock(int numLinks) throws IOException {
|
||||
public boolean unlinkBlock(int numLinks) throws IOException {
|
||||
if (isUnlinked()) {
|
||||
return false;
|
||||
}
|
||||
|
|
|
@ -29,21 +29,21 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
|||
*/
|
||||
public class ReplicaNotFoundException extends IOException {
|
||||
private static final long serialVersionUID = 1L;
|
||||
final static String NON_RBW_REPLICA = "Cannot recover a non-RBW replica ";
|
||||
final static String UNFINALIZED_REPLICA =
|
||||
public final static String NON_RBW_REPLICA = "Cannot recover a non-RBW replica ";
|
||||
public final static String UNFINALIZED_REPLICA =
|
||||
"Cannot append to an unfinalized replica ";
|
||||
final static String UNFINALIZED_AND_NONRBW_REPLICA =
|
||||
public final static String UNFINALIZED_AND_NONRBW_REPLICA =
|
||||
"Cannot recover append/close to a replica that's not FINALIZED and not RBW ";
|
||||
final static String NON_EXISTENT_REPLICA =
|
||||
public final static String NON_EXISTENT_REPLICA =
|
||||
"Cannot append to a non-existent replica ";
|
||||
final static String UNEXPECTED_GS_REPLICA =
|
||||
public final static String UNEXPECTED_GS_REPLICA =
|
||||
"Cannot append to a replica with unexpeted generation stamp ";
|
||||
|
||||
public ReplicaNotFoundException() {
|
||||
super();
|
||||
}
|
||||
|
||||
ReplicaNotFoundException(ExtendedBlock b) {
|
||||
public ReplicaNotFoundException(ExtendedBlock b) {
|
||||
super("Replica not found for " + b);
|
||||
}
|
||||
|
||||
|
|
|
@ -31,12 +31,12 @@ import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
|||
* A recovery with higher recovery id preempts recoveries with a lower id.
|
||||
*
|
||||
*/
|
||||
class ReplicaUnderRecovery extends ReplicaInfo {
|
||||
public class ReplicaUnderRecovery extends ReplicaInfo {
|
||||
private ReplicaInfo original; // the original replica that needs to be recovered
|
||||
private long recoveryId; // recovery id; it is also the generation stamp
|
||||
// that the replica will be bumped to after recovery
|
||||
|
||||
ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) {
|
||||
public ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) {
|
||||
super(replica.getBlockId(), replica.getNumBytes(), replica.getGenerationStamp(),
|
||||
replica.getVolume(), replica.getDir());
|
||||
if ( replica.getState() != ReplicaState.FINALIZED &&
|
||||
|
@ -52,7 +52,7 @@ class ReplicaUnderRecovery extends ReplicaInfo {
|
|||
* Copy constructor.
|
||||
* @param from
|
||||
*/
|
||||
ReplicaUnderRecovery(ReplicaUnderRecovery from) {
|
||||
public ReplicaUnderRecovery(ReplicaUnderRecovery from) {
|
||||
super(from);
|
||||
this.original = from.getOriginalReplica();
|
||||
this.recoveryId = from.getRecoveryID();
|
||||
|
@ -62,7 +62,7 @@ class ReplicaUnderRecovery extends ReplicaInfo {
|
|||
* Get the recovery id
|
||||
* @return the generation stamp that the replica will be bumped to
|
||||
*/
|
||||
long getRecoveryID() {
|
||||
public long getRecoveryID() {
|
||||
return recoveryId;
|
||||
}
|
||||
|
||||
|
@ -70,7 +70,7 @@ class ReplicaUnderRecovery extends ReplicaInfo {
|
|||
* Set the recovery id
|
||||
* @param recoveryId the new recoveryId
|
||||
*/
|
||||
void setRecoveryID(long recoveryId) {
|
||||
public void setRecoveryID(long recoveryId) {
|
||||
if (recoveryId > this.recoveryId) {
|
||||
this.recoveryId = recoveryId;
|
||||
} else {
|
||||
|
@ -83,17 +83,17 @@ class ReplicaUnderRecovery extends ReplicaInfo {
|
|||
* Get the original replica that's under recovery
|
||||
* @return the original replica under recovery
|
||||
*/
|
||||
ReplicaInfo getOriginalReplica() {
|
||||
public ReplicaInfo getOriginalReplica() {
|
||||
return original;
|
||||
}
|
||||
|
||||
@Override //ReplicaInfo
|
||||
boolean isUnlinked() {
|
||||
public boolean isUnlinked() {
|
||||
return original.isUnlinked();
|
||||
}
|
||||
|
||||
@Override //ReplicaInfo
|
||||
void setUnlinked() {
|
||||
public void setUnlinked() {
|
||||
original.setUnlinked();
|
||||
}
|
||||
|
||||
|
@ -131,7 +131,7 @@ class ReplicaUnderRecovery extends ReplicaInfo {
|
|||
}
|
||||
|
||||
@Override //ReplicaInfo
|
||||
void setDir(File dir) {
|
||||
public void setDir(File dir) {
|
||||
super.setDir(dir);
|
||||
original.setDir(dir);
|
||||
}
|
||||
|
@ -159,7 +159,7 @@ class ReplicaUnderRecovery extends ReplicaInfo {
|
|||
+ "\n original=" + original;
|
||||
}
|
||||
|
||||
ReplicaRecoveryInfo createInfo() {
|
||||
public ReplicaRecoveryInfo createInfo() {
|
||||
return new ReplicaRecoveryInfo(original.getBlockId(),
|
||||
original.getBytesOnDisk(), original.getGenerationStamp(),
|
||||
original.getState());
|
||||
|
|
|
@ -32,7 +32,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
|||
* client continues to write or be recovered as a result of
|
||||
* lease recovery.
|
||||
*/
|
||||
class ReplicaWaitingToBeRecovered extends ReplicaInfo {
|
||||
public class ReplicaWaitingToBeRecovered extends ReplicaInfo {
|
||||
private boolean unlinked; // copy-on-write done for block
|
||||
|
||||
/**
|
||||
|
@ -43,7 +43,7 @@ class ReplicaWaitingToBeRecovered extends ReplicaInfo {
|
|||
* @param vol volume where replica is located
|
||||
* @param dir directory path where block and meta files are located
|
||||
*/
|
||||
ReplicaWaitingToBeRecovered(long blockId, long len, long genStamp,
|
||||
public ReplicaWaitingToBeRecovered(long blockId, long len, long genStamp,
|
||||
FsVolumeSpi vol, File dir) {
|
||||
super(blockId, len, genStamp, vol, dir);
|
||||
}
|
||||
|
@ -54,7 +54,7 @@ class ReplicaWaitingToBeRecovered extends ReplicaInfo {
|
|||
* @param vol volume where replica is located
|
||||
* @param dir directory path where block and meta files are located
|
||||
*/
|
||||
ReplicaWaitingToBeRecovered(Block block, FsVolumeSpi vol, File dir) {
|
||||
public ReplicaWaitingToBeRecovered(Block block, FsVolumeSpi vol, File dir) {
|
||||
super(block, vol, dir);
|
||||
}
|
||||
|
||||
|
@ -62,7 +62,7 @@ class ReplicaWaitingToBeRecovered extends ReplicaInfo {
|
|||
* Copy constructor.
|
||||
* @param from
|
||||
*/
|
||||
ReplicaWaitingToBeRecovered(ReplicaWaitingToBeRecovered from) {
|
||||
public ReplicaWaitingToBeRecovered(ReplicaWaitingToBeRecovered from) {
|
||||
super(from);
|
||||
this.unlinked = from.isUnlinked();
|
||||
}
|
||||
|
@ -73,12 +73,12 @@ class ReplicaWaitingToBeRecovered extends ReplicaInfo {
|
|||
}
|
||||
|
||||
@Override //ReplicaInfo
|
||||
boolean isUnlinked() {
|
||||
public boolean isUnlinked() {
|
||||
return unlinked;
|
||||
}
|
||||
|
||||
@Override //ReplicaInfo
|
||||
void setUnlinked() {
|
||||
public void setUnlinked() {
|
||||
unlinked = true;
|
||||
}
|
||||
|
||||
|
|
|
@ -33,9 +33,9 @@ import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
|
|||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDataset;
|
||||
import org.apache.hadoop.hdfs.server.datanode.Replica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipelineInterface;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
|
||||
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
|
@ -58,7 +58,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
@SuppressWarnings("rawtypes")
|
||||
final Class<? extends Factory> clazz = conf.getClass(
|
||||
DFSConfigKeys.DFS_DATANODE_FSDATASET_FACTORY_KEY,
|
||||
FSDataset.Factory.class,
|
||||
FsDatasetFactory.class,
|
||||
Factory.class);
|
||||
return ReflectionUtils.newInstance(clazz, conf);
|
||||
}
|
||||
|
|
|
@ -0,0 +1,286 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.DU;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.datanode.BlockMetadataHeader;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
|
||||
/**
|
||||
* A block pool slice represents a portion of a block pool stored on a volume.
|
||||
* Taken together, all BlockPoolSlices sharing a block pool ID across a
|
||||
* cluster represent a single block pool.
|
||||
*
|
||||
* This class is synchronized by {@link FsVolumeImpl}.
|
||||
*/
|
||||
class BlockPoolSlice {
|
||||
private final String bpid;
|
||||
private final FsVolumeImpl volume; // volume to which this BlockPool belongs to
|
||||
private final File currentDir; // StorageDirectory/current/bpid/current
|
||||
private final LDir finalizedDir; // directory store Finalized replica
|
||||
private final File rbwDir; // directory store RBW replica
|
||||
private final File tmpDir; // directory store Temporary replica
|
||||
|
||||
// TODO:FEDERATION scalability issue - a thread per DU is needed
|
||||
private final DU dfsUsage;
|
||||
|
||||
/**
|
||||
* Create a blook pool slice
|
||||
* @param bpid Block pool Id
|
||||
* @param volume {@link FsVolumeImpl} to which this BlockPool belongs to
|
||||
* @param bpDir directory corresponding to the BlockPool
|
||||
* @param conf
|
||||
* @throws IOException
|
||||
*/
|
||||
BlockPoolSlice(String bpid, FsVolumeImpl volume, File bpDir,
|
||||
Configuration conf) throws IOException {
|
||||
this.bpid = bpid;
|
||||
this.volume = volume;
|
||||
this.currentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
|
||||
final File finalizedDir = new File(
|
||||
currentDir, DataStorage.STORAGE_DIR_FINALIZED);
|
||||
|
||||
// Files that were being written when the datanode was last shutdown
|
||||
// are now moved back to the data directory. It is possible that
|
||||
// in the future, we might want to do some sort of datanode-local
|
||||
// recovery for these blocks. For example, crc validation.
|
||||
//
|
||||
this.tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
|
||||
if (tmpDir.exists()) {
|
||||
FileUtil.fullyDelete(tmpDir);
|
||||
}
|
||||
this.rbwDir = new File(currentDir, DataStorage.STORAGE_DIR_RBW);
|
||||
final boolean supportAppends = conf.getBoolean(
|
||||
DFSConfigKeys.DFS_SUPPORT_APPEND_KEY,
|
||||
DFSConfigKeys.DFS_SUPPORT_APPEND_DEFAULT);
|
||||
if (rbwDir.exists() && !supportAppends) {
|
||||
FileUtil.fullyDelete(rbwDir);
|
||||
}
|
||||
final int maxBlocksPerDir = conf.getInt(
|
||||
DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_NUMBLOCKS_DEFAULT);
|
||||
this.finalizedDir = new LDir(finalizedDir, maxBlocksPerDir);
|
||||
if (!rbwDir.mkdirs()) { // create rbw directory if not exist
|
||||
if (!rbwDir.isDirectory()) {
|
||||
throw new IOException("Mkdirs failed to create " + rbwDir.toString());
|
||||
}
|
||||
}
|
||||
if (!tmpDir.mkdirs()) {
|
||||
if (!tmpDir.isDirectory()) {
|
||||
throw new IOException("Mkdirs failed to create " + tmpDir.toString());
|
||||
}
|
||||
}
|
||||
this.dfsUsage = new DU(bpDir, conf);
|
||||
this.dfsUsage.start();
|
||||
}
|
||||
|
||||
File getDirectory() {
|
||||
return currentDir.getParentFile();
|
||||
}
|
||||
|
||||
File getFinalizedDir() {
|
||||
return finalizedDir.dir;
|
||||
}
|
||||
|
||||
File getRbwDir() {
|
||||
return rbwDir;
|
||||
}
|
||||
|
||||
/** Run DU on local drives. It must be synchronized from caller. */
|
||||
void decDfsUsed(long value) {
|
||||
dfsUsage.decDfsUsed(value);
|
||||
}
|
||||
|
||||
long getDfsUsed() throws IOException {
|
||||
return dfsUsage.getUsed();
|
||||
}
|
||||
|
||||
/**
|
||||
* Temporary files. They get moved to the finalized block directory when
|
||||
* the block is finalized.
|
||||
*/
|
||||
File createTmpFile(Block b) throws IOException {
|
||||
File f = new File(tmpDir, b.getBlockName());
|
||||
return DatanodeUtil.createTmpFile(b, f);
|
||||
}
|
||||
|
||||
/**
|
||||
* RBW files. They get moved to the finalized block directory when
|
||||
* the block is finalized.
|
||||
*/
|
||||
File createRbwFile(Block b) throws IOException {
|
||||
File f = new File(rbwDir, b.getBlockName());
|
||||
return DatanodeUtil.createTmpFile(b, f);
|
||||
}
|
||||
|
||||
File addBlock(Block b, File f) throws IOException {
|
||||
File blockFile = finalizedDir.addBlock(b, f);
|
||||
File metaFile = FsDatasetUtil.getMetaFile(blockFile, b.getGenerationStamp());
|
||||
dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
|
||||
return blockFile;
|
||||
}
|
||||
|
||||
void checkDirs() throws DiskErrorException {
|
||||
finalizedDir.checkDirTree();
|
||||
DiskChecker.checkDir(tmpDir);
|
||||
DiskChecker.checkDir(rbwDir);
|
||||
}
|
||||
|
||||
void getVolumeMap(ReplicaMap volumeMap) throws IOException {
|
||||
// add finalized replicas
|
||||
finalizedDir.getVolumeMap(bpid, volumeMap, volume);
|
||||
// add rbw replicas
|
||||
addToReplicasMap(volumeMap, rbwDir, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add replicas under the given directory to the volume map
|
||||
* @param volumeMap the replicas map
|
||||
* @param dir an input directory
|
||||
* @param isFinalized true if the directory has finalized replicas;
|
||||
* false if the directory has rbw replicas
|
||||
*/
|
||||
void addToReplicasMap(ReplicaMap volumeMap, File dir, boolean isFinalized
|
||||
) throws IOException {
|
||||
File blockFiles[] = FileUtil.listFiles(dir);
|
||||
for (File blockFile : blockFiles) {
|
||||
if (!Block.isBlockFilename(blockFile))
|
||||
continue;
|
||||
|
||||
long genStamp = FsDatasetUtil.getGenerationStampFromFile(
|
||||
blockFiles, blockFile);
|
||||
long blockId = Block.filename2id(blockFile.getName());
|
||||
ReplicaInfo newReplica = null;
|
||||
if (isFinalized) {
|
||||
newReplica = new FinalizedReplica(blockId,
|
||||
blockFile.length(), genStamp, volume, blockFile.getParentFile());
|
||||
} else {
|
||||
newReplica = new ReplicaWaitingToBeRecovered(blockId,
|
||||
validateIntegrity(blockFile, genStamp),
|
||||
genStamp, volume, blockFile.getParentFile());
|
||||
}
|
||||
|
||||
ReplicaInfo oldReplica = volumeMap.add(bpid, newReplica);
|
||||
if (oldReplica != null) {
|
||||
FsDatasetImpl.LOG.warn("Two block files with the same block id exist " +
|
||||
"on disk: " + oldReplica.getBlockFile() + " and " + blockFile );
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Find out the number of bytes in the block that match its crc.
|
||||
*
|
||||
* This algorithm assumes that data corruption caused by unexpected
|
||||
* datanode shutdown occurs only in the last crc chunk. So it checks
|
||||
* only the last chunk.
|
||||
*
|
||||
* @param blockFile the block file
|
||||
* @param genStamp generation stamp of the block
|
||||
* @return the number of valid bytes
|
||||
*/
|
||||
private long validateIntegrity(File blockFile, long genStamp) {
|
||||
DataInputStream checksumIn = null;
|
||||
InputStream blockIn = null;
|
||||
try {
|
||||
final File metaFile = FsDatasetUtil.getMetaFile(blockFile, genStamp);
|
||||
long blockFileLen = blockFile.length();
|
||||
long metaFileLen = metaFile.length();
|
||||
int crcHeaderLen = DataChecksum.getChecksumHeaderSize();
|
||||
if (!blockFile.exists() || blockFileLen == 0 ||
|
||||
!metaFile.exists() || metaFileLen < crcHeaderLen) {
|
||||
return 0;
|
||||
}
|
||||
checksumIn = new DataInputStream(
|
||||
new BufferedInputStream(new FileInputStream(metaFile),
|
||||
HdfsConstants.IO_FILE_BUFFER_SIZE));
|
||||
|
||||
// read and handle the common header here. For now just a version
|
||||
BlockMetadataHeader header = BlockMetadataHeader.readHeader(checksumIn);
|
||||
short version = header.getVersion();
|
||||
if (version != BlockMetadataHeader.VERSION) {
|
||||
FsDatasetImpl.LOG.warn("Wrong version (" + version + ") for metadata file "
|
||||
+ metaFile + " ignoring ...");
|
||||
}
|
||||
DataChecksum checksum = header.getChecksum();
|
||||
int bytesPerChecksum = checksum.getBytesPerChecksum();
|
||||
int checksumSize = checksum.getChecksumSize();
|
||||
long numChunks = Math.min(
|
||||
(blockFileLen + bytesPerChecksum - 1)/bytesPerChecksum,
|
||||
(metaFileLen - crcHeaderLen)/checksumSize);
|
||||
if (numChunks == 0) {
|
||||
return 0;
|
||||
}
|
||||
IOUtils.skipFully(checksumIn, (numChunks-1)*checksumSize);
|
||||
blockIn = new FileInputStream(blockFile);
|
||||
long lastChunkStartPos = (numChunks-1)*bytesPerChecksum;
|
||||
IOUtils.skipFully(blockIn, lastChunkStartPos);
|
||||
int lastChunkSize = (int)Math.min(
|
||||
bytesPerChecksum, blockFileLen-lastChunkStartPos);
|
||||
byte[] buf = new byte[lastChunkSize+checksumSize];
|
||||
checksumIn.readFully(buf, lastChunkSize, checksumSize);
|
||||
IOUtils.readFully(blockIn, buf, 0, lastChunkSize);
|
||||
|
||||
checksum.update(buf, 0, lastChunkSize);
|
||||
if (checksum.compare(buf, lastChunkSize)) { // last chunk matches crc
|
||||
return lastChunkStartPos + lastChunkSize;
|
||||
} else { // last chunck is corrupt
|
||||
return lastChunkStartPos;
|
||||
}
|
||||
} catch (IOException e) {
|
||||
FsDatasetImpl.LOG.warn(e);
|
||||
return 0;
|
||||
} finally {
|
||||
IOUtils.closeStream(checksumIn);
|
||||
IOUtils.closeStream(blockIn);
|
||||
}
|
||||
}
|
||||
|
||||
void clearPath(File f) {
|
||||
finalizedDir.clearPath(f);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return currentDir.getAbsolutePath();
|
||||
}
|
||||
|
||||
void shutdown() {
|
||||
dfsUsage.shutdown();
|
||||
}
|
||||
}
|
|
@ -16,7 +16,7 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.util.HashMap;
|
||||
|
@ -29,29 +29,25 @@ import java.util.concurrent.TimeUnit;
|
|||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
||||
|
||||
/*
|
||||
/**
|
||||
* This class is a container of multiple thread pools, each for a volume,
|
||||
* so that we can schedule async disk operations easily.
|
||||
*
|
||||
* Examples of async disk operations are deletion of block files in FSDataset.
|
||||
* Examples of async disk operations are deletion of block files.
|
||||
* We don't want to create a new thread for each of the deletion request, and
|
||||
* we don't want to do all deletions in the heartbeat thread since deletion
|
||||
* can be slow, and we don't want to use a single thread pool because that
|
||||
* is inefficient when we have more than 1 volume. AsyncDiskService is the
|
||||
* solution for these.
|
||||
*
|
||||
* This class is used inside FSDataset.
|
||||
*
|
||||
* In the future, we should extract AsyncDiskService and put it into common.
|
||||
* The FSDataset-specific logic should reside here.
|
||||
* This class and {@link org.apache.hadoop.util.AsyncDiskService} are similar.
|
||||
* They should be combined.
|
||||
*/
|
||||
class FSDatasetAsyncDiskService {
|
||||
|
||||
final FSDataset dataset;
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(FSDatasetAsyncDiskService.class);
|
||||
class FsDatasetAsyncDiskService {
|
||||
public static final Log LOG = LogFactory.getLog(FsDatasetAsyncDiskService.class);
|
||||
|
||||
// ThreadPool core pool size
|
||||
private static final int CORE_THREADS_PER_VOLUME = 1;
|
||||
|
@ -60,9 +56,8 @@ class FSDatasetAsyncDiskService {
|
|||
// ThreadPool keep-alive time for threads over core pool size
|
||||
private static final long THREADS_KEEP_ALIVE_SECONDS = 60;
|
||||
|
||||
private final ThreadGroup threadGroup = new ThreadGroup("async disk service");
|
||||
|
||||
private HashMap<File, ThreadPoolExecutor> executors
|
||||
private final DataNode datanode;
|
||||
private Map<File, ThreadPoolExecutor> executors
|
||||
= new HashMap<File, ThreadPoolExecutor>();
|
||||
|
||||
/**
|
||||
|
@ -74,8 +69,10 @@ class FSDatasetAsyncDiskService {
|
|||
*
|
||||
* @param volumes The roots of the data volumes.
|
||||
*/
|
||||
FSDatasetAsyncDiskService(FSDataset dataset, File[] volumes) {
|
||||
this.dataset = dataset;
|
||||
FsDatasetAsyncDiskService(DataNode datanode, File[] volumes) {
|
||||
this.datanode = datanode;
|
||||
|
||||
final ThreadGroup threadGroup = new ThreadGroup(getClass().getSimpleName());
|
||||
// Create one ThreadPool per volume
|
||||
for (int v = 0 ; v < volumes.length; v++) {
|
||||
final File vol = volumes[v];
|
||||
|
@ -136,16 +133,12 @@ class FSDatasetAsyncDiskService {
|
|||
* tasks to finish.
|
||||
*/
|
||||
synchronized void shutdown() {
|
||||
|
||||
if (executors == null) {
|
||||
|
||||
LOG.warn("AsyncDiskService has already shut down.");
|
||||
|
||||
} else {
|
||||
LOG.info("Shutting down all async disk service threads...");
|
||||
|
||||
for (Map.Entry<File, ThreadPoolExecutor> e
|
||||
: executors.entrySet()) {
|
||||
for (Map.Entry<File, ThreadPoolExecutor> e : executors.entrySet()) {
|
||||
e.getValue().shutdown();
|
||||
}
|
||||
// clear the executor map so that calling execute again will fail.
|
||||
|
@ -159,11 +152,11 @@ class FSDatasetAsyncDiskService {
|
|||
* Delete the block file and meta file from the disk asynchronously, adjust
|
||||
* dfsUsed statistics accordingly.
|
||||
*/
|
||||
void deleteAsync(FSDataset.FSVolume volume, File blockFile, File metaFile,
|
||||
void deleteAsync(FsVolumeImpl volume, File blockFile, File metaFile,
|
||||
ExtendedBlock block) {
|
||||
DataNode.LOG.info("Scheduling block " + block.getLocalBlock().toString()
|
||||
LOG.info("Scheduling block " + block.getLocalBlock()
|
||||
+ " file " + blockFile + " for deletion");
|
||||
ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(dataset,
|
||||
ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
|
||||
volume, blockFile, metaFile, block);
|
||||
execute(volume.getCurrentDir(), deletionTask);
|
||||
}
|
||||
|
@ -171,51 +164,43 @@ class FSDatasetAsyncDiskService {
|
|||
/** A task for deleting a block file and its associated meta file, as well
|
||||
* as decrement the dfs usage of the volume.
|
||||
*/
|
||||
static class ReplicaFileDeleteTask implements Runnable {
|
||||
final FSDataset dataset;
|
||||
final FSDataset.FSVolume volume;
|
||||
class ReplicaFileDeleteTask implements Runnable {
|
||||
final FsVolumeImpl volume;
|
||||
final File blockFile;
|
||||
final File metaFile;
|
||||
final ExtendedBlock block;
|
||||
|
||||
ReplicaFileDeleteTask(FSDataset dataset, FSDataset.FSVolume volume, File blockFile,
|
||||
ReplicaFileDeleteTask(FsVolumeImpl volume, File blockFile,
|
||||
File metaFile, ExtendedBlock block) {
|
||||
this.dataset = dataset;
|
||||
this.volume = volume;
|
||||
this.blockFile = blockFile;
|
||||
this.metaFile = metaFile;
|
||||
this.block = block;
|
||||
}
|
||||
|
||||
FSDataset.FSVolume getVolume() {
|
||||
return volume;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
// Called in AsyncDiskService.execute for displaying error messages.
|
||||
return "deletion of block " + block.getBlockPoolId() + " "
|
||||
+ block.getLocalBlock().toString() + " with block file " + blockFile
|
||||
+ block.getLocalBlock() + " with block file " + blockFile
|
||||
+ " and meta file " + metaFile + " from volume " + volume;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
long dfsBytes = blockFile.length() + metaFile.length();
|
||||
if ( !blockFile.delete() || ( !metaFile.delete() && metaFile.exists() ) ) {
|
||||
DataNode.LOG.warn("Unexpected error trying to delete block "
|
||||
+ block.getBlockPoolId() + " " + block.getLocalBlock().toString()
|
||||
if (!blockFile.delete() || (!metaFile.delete() && metaFile.exists())) {
|
||||
LOG.warn("Unexpected error trying to delete block "
|
||||
+ block.getBlockPoolId() + " " + block.getLocalBlock()
|
||||
+ " at file " + blockFile + ". Ignored.");
|
||||
} else {
|
||||
if(block.getLocalBlock().getNumBytes() != BlockCommand.NO_ACK){
|
||||
dataset.notifyNamenodeDeletedBlock(block);
|
||||
datanode.notifyNamenodeDeletedBlock(block);
|
||||
}
|
||||
volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
|
||||
DataNode.LOG.info("Deleted block " + block.getBlockPoolId() + " "
|
||||
+ block.getLocalBlock().toString() + " at file " + blockFile);
|
||||
LOG.info("Deleted block " + block.getBlockPoolId() + " "
|
||||
+ block.getLocalBlock() + " at file " + blockFile);
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,36 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
|
||||
/**
|
||||
* A factory for creating {@link FsDatasetImpl} objects.
|
||||
*/
|
||||
public class FsDatasetFactory extends FsDatasetSpi.Factory<FsDatasetImpl> {
|
||||
@Override
|
||||
public FsDatasetImpl newInstance(DataNode datanode,
|
||||
DataStorage storage, Configuration conf) throws IOException {
|
||||
return new FsDatasetImpl(datanode, storage, conf);
|
||||
}
|
||||
}
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,107 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FilenameFilter;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||
|
||||
/** Utility methods. */
|
||||
@InterfaceAudience.Private
|
||||
public class FsDatasetUtil {
|
||||
static boolean isUnlinkTmpFile(File f) {
|
||||
return f.getName().endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX);
|
||||
}
|
||||
|
||||
static File getOrigFile(File unlinkTmpFile) {
|
||||
final String name = unlinkTmpFile.getName();
|
||||
if (!name.endsWith(DatanodeUtil.UNLINK_BLOCK_SUFFIX)) {
|
||||
throw new IllegalArgumentException("unlinkTmpFile=" + unlinkTmpFile
|
||||
+ " does not end with " + DatanodeUtil.UNLINK_BLOCK_SUFFIX);
|
||||
}
|
||||
final int n = name.length() - DatanodeUtil.UNLINK_BLOCK_SUFFIX.length();
|
||||
return new File(unlinkTmpFile.getParentFile(), name.substring(0, n));
|
||||
}
|
||||
|
||||
static File getMetaFile(File f, long gs) {
|
||||
return new File(f.getParent(),
|
||||
DatanodeUtil.getMetaName(f.getName(), gs));
|
||||
}
|
||||
|
||||
/** Find the corresponding meta data file from a given block file */
|
||||
static File findMetaFile(final File blockFile) throws IOException {
|
||||
final String prefix = blockFile.getName() + "_";
|
||||
final File parent = blockFile.getParentFile();
|
||||
final File[] matches = parent.listFiles(new FilenameFilter() {
|
||||
@Override
|
||||
public boolean accept(File dir, String name) {
|
||||
return dir.equals(parent) && name.startsWith(prefix)
|
||||
&& name.endsWith(Block.METADATA_EXTENSION);
|
||||
}
|
||||
});
|
||||
|
||||
if (matches == null || matches.length == 0) {
|
||||
throw new IOException("Meta file not found, blockFile=" + blockFile);
|
||||
}
|
||||
if (matches.length > 1) {
|
||||
throw new IOException("Found more than one meta files: "
|
||||
+ Arrays.asList(matches));
|
||||
}
|
||||
return matches[0];
|
||||
}
|
||||
|
||||
/**
|
||||
* Find the meta-file for the specified block file
|
||||
* and then return the generation stamp from the name of the meta-file.
|
||||
*/
|
||||
static long getGenerationStampFromFile(File[] listdir, File blockFile) {
|
||||
String blockName = blockFile.getName();
|
||||
for (int j = 0; j < listdir.length; j++) {
|
||||
String path = listdir[j].getName();
|
||||
if (!path.startsWith(blockName)) {
|
||||
continue;
|
||||
}
|
||||
if (blockFile == listdir[j]) {
|
||||
continue;
|
||||
}
|
||||
return Block.getGenerationStamp(listdir[j].getName());
|
||||
}
|
||||
FsDatasetImpl.LOG.warn("Block " + blockFile + " does not have a metafile!");
|
||||
return GenerationStamp.GRANDFATHER_GENERATION_STAMP;
|
||||
}
|
||||
|
||||
/** Find the corresponding meta data file from a given block file */
|
||||
static long parseGenerationStamp(File blockFile, File metaFile
|
||||
) throws IOException {
|
||||
final String metaname = metaFile.getName();
|
||||
final String gs = metaname.substring(blockFile.getName().length() + 1,
|
||||
metaname.length() - Block.METADATA_EXTENSION.length());
|
||||
try {
|
||||
return Long.parseLong(gs);
|
||||
} catch(NumberFormatException nfe) {
|
||||
throw new IOException("Failed to parse generation stamp: blockFile="
|
||||
+ blockFile + ", metaFile=" + metaFile, nfe);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,288 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.DF;
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
|
||||
/**
|
||||
* The underlying volume used to store replica.
|
||||
*
|
||||
* It uses the {@link FsDatasetImpl} object for synchronization.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class FsVolumeImpl implements FsVolumeSpi {
|
||||
private final FsDatasetImpl dataset;
|
||||
private final String storageID;
|
||||
private final Map<String, BlockPoolSlice> bpSlices
|
||||
= new HashMap<String, BlockPoolSlice>();
|
||||
private final File currentDir; // <StorageDirectory>/current
|
||||
private final DF usage;
|
||||
private final long reserved;
|
||||
|
||||
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
|
||||
Configuration conf) throws IOException {
|
||||
this.dataset = dataset;
|
||||
this.storageID = storageID;
|
||||
this.reserved = conf.getLong(
|
||||
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
|
||||
this.currentDir = currentDir;
|
||||
File parent = currentDir.getParentFile();
|
||||
this.usage = new DF(parent, conf);
|
||||
}
|
||||
|
||||
File getCurrentDir() {
|
||||
return currentDir;
|
||||
}
|
||||
|
||||
File getRbwDir(String bpid) throws IOException {
|
||||
return getBlockPoolSlice(bpid).getRbwDir();
|
||||
}
|
||||
|
||||
void decDfsUsed(String bpid, long value) {
|
||||
synchronized(dataset) {
|
||||
BlockPoolSlice bp = bpSlices.get(bpid);
|
||||
if (bp != null) {
|
||||
bp.decDfsUsed(value);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
long getDfsUsed() throws IOException {
|
||||
long dfsUsed = 0;
|
||||
synchronized(dataset) {
|
||||
for(BlockPoolSlice s : bpSlices.values()) {
|
||||
dfsUsed += s.getDfsUsed();
|
||||
}
|
||||
}
|
||||
return dfsUsed;
|
||||
}
|
||||
|
||||
long getBlockPoolUsed(String bpid) throws IOException {
|
||||
return getBlockPoolSlice(bpid).getDfsUsed();
|
||||
}
|
||||
|
||||
/**
|
||||
* Calculate the capacity of the filesystem, after removing any
|
||||
* reserved capacity.
|
||||
* @return the unreserved number of bytes left in this filesystem. May be zero.
|
||||
*/
|
||||
long getCapacity() {
|
||||
long remaining = usage.getCapacity() - reserved;
|
||||
return remaining > 0 ? remaining : 0;
|
||||
}
|
||||
|
||||
@Override
|
||||
public long getAvailable() throws IOException {
|
||||
long remaining = getCapacity()-getDfsUsed();
|
||||
long available = usage.getAvailable();
|
||||
if (remaining > available) {
|
||||
remaining = available;
|
||||
}
|
||||
return (remaining > 0) ? remaining : 0;
|
||||
}
|
||||
|
||||
long getReserved(){
|
||||
return reserved;
|
||||
}
|
||||
|
||||
BlockPoolSlice getBlockPoolSlice(String bpid) throws IOException {
|
||||
BlockPoolSlice bp = bpSlices.get(bpid);
|
||||
if (bp == null) {
|
||||
throw new IOException("block pool " + bpid + " is not found");
|
||||
}
|
||||
return bp;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPath(String bpid) throws IOException {
|
||||
return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
|
||||
}
|
||||
|
||||
@Override
|
||||
public File getFinalizedDir(String bpid) throws IOException {
|
||||
return getBlockPoolSlice(bpid).getFinalizedDir();
|
||||
}
|
||||
|
||||
/**
|
||||
* Make a deep copy of the list of currently active BPIDs
|
||||
*/
|
||||
@Override
|
||||
public String[] getBlockPoolList() {
|
||||
return bpSlices.keySet().toArray(new String[bpSlices.keySet().size()]);
|
||||
}
|
||||
|
||||
/**
|
||||
* Temporary files. They get moved to the finalized block directory when
|
||||
* the block is finalized.
|
||||
*/
|
||||
File createTmpFile(String bpid, Block b) throws IOException {
|
||||
return getBlockPoolSlice(bpid).createTmpFile(b);
|
||||
}
|
||||
|
||||
/**
|
||||
* RBW files. They get moved to the finalized block directory when
|
||||
* the block is finalized.
|
||||
*/
|
||||
File createRbwFile(String bpid, Block b) throws IOException {
|
||||
return getBlockPoolSlice(bpid).createRbwFile(b);
|
||||
}
|
||||
|
||||
File addBlock(String bpid, Block b, File f) throws IOException {
|
||||
return getBlockPoolSlice(bpid).addBlock(b, f);
|
||||
}
|
||||
|
||||
void checkDirs() throws DiskErrorException {
|
||||
// TODO:FEDERATION valid synchronization
|
||||
for(BlockPoolSlice s : bpSlices.values()) {
|
||||
s.checkDirs();
|
||||
}
|
||||
}
|
||||
|
||||
void getVolumeMap(ReplicaMap volumeMap) throws IOException {
|
||||
for(BlockPoolSlice s : bpSlices.values()) {
|
||||
s.getVolumeMap(volumeMap);
|
||||
}
|
||||
}
|
||||
|
||||
void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
|
||||
getBlockPoolSlice(bpid).getVolumeMap(volumeMap);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add replicas under the given directory to the volume map
|
||||
* @param volumeMap the replicas map
|
||||
* @param dir an input directory
|
||||
* @param isFinalized true if the directory has finalized replicas;
|
||||
* false if the directory has rbw replicas
|
||||
* @throws IOException
|
||||
*/
|
||||
void addToReplicasMap(String bpid, ReplicaMap volumeMap,
|
||||
File dir, boolean isFinalized) throws IOException {
|
||||
BlockPoolSlice bp = getBlockPoolSlice(bpid);
|
||||
// TODO move this up
|
||||
// dfsUsage.incDfsUsed(b.getNumBytes()+metaFile.length());
|
||||
bp.addToReplicasMap(volumeMap, dir, isFinalized);
|
||||
}
|
||||
|
||||
void clearPath(String bpid, File f) throws IOException {
|
||||
getBlockPoolSlice(bpid).clearPath(f);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return currentDir.getAbsolutePath();
|
||||
}
|
||||
|
||||
void shutdown() {
|
||||
Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
|
||||
for (Entry<String, BlockPoolSlice> entry : set) {
|
||||
entry.getValue().shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
void addBlockPool(String bpid, Configuration conf) throws IOException {
|
||||
File bpdir = new File(currentDir, bpid);
|
||||
BlockPoolSlice bp = new BlockPoolSlice(bpid, this, bpdir, conf);
|
||||
bpSlices.put(bpid, bp);
|
||||
}
|
||||
|
||||
void shutdownBlockPool(String bpid) {
|
||||
BlockPoolSlice bp = bpSlices.get(bpid);
|
||||
if (bp != null) {
|
||||
bp.shutdown();
|
||||
}
|
||||
bpSlices.remove(bpid);
|
||||
}
|
||||
|
||||
boolean isBPDirEmpty(String bpid) throws IOException {
|
||||
File volumeCurrentDir = this.getCurrentDir();
|
||||
File bpDir = new File(volumeCurrentDir, bpid);
|
||||
File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
|
||||
File finalizedDir = new File(bpCurrentDir,
|
||||
DataStorage.STORAGE_DIR_FINALIZED);
|
||||
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
|
||||
if (finalizedDir.exists() && FileUtil.list(finalizedDir).length != 0) {
|
||||
return false;
|
||||
}
|
||||
if (rbwDir.exists() && FileUtil.list(rbwDir).length != 0) {
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
void deleteBPDirectories(String bpid, boolean force) throws IOException {
|
||||
File volumeCurrentDir = this.getCurrentDir();
|
||||
File bpDir = new File(volumeCurrentDir, bpid);
|
||||
if (!bpDir.isDirectory()) {
|
||||
// nothing to be deleted
|
||||
return;
|
||||
}
|
||||
File tmpDir = new File(bpDir, DataStorage.STORAGE_DIR_TMP);
|
||||
File bpCurrentDir = new File(bpDir, DataStorage.STORAGE_DIR_CURRENT);
|
||||
File finalizedDir = new File(bpCurrentDir,
|
||||
DataStorage.STORAGE_DIR_FINALIZED);
|
||||
File rbwDir = new File(bpCurrentDir, DataStorage.STORAGE_DIR_RBW);
|
||||
if (force) {
|
||||
FileUtil.fullyDelete(bpDir);
|
||||
} else {
|
||||
if (!rbwDir.delete()) {
|
||||
throw new IOException("Failed to delete " + rbwDir);
|
||||
}
|
||||
if (!finalizedDir.delete()) {
|
||||
throw new IOException("Failed to delete " + finalizedDir);
|
||||
}
|
||||
FileUtil.fullyDelete(tmpDir);
|
||||
for (File f : FileUtil.listFiles(bpCurrentDir)) {
|
||||
if (!f.delete()) {
|
||||
throw new IOException("Failed to delete " + f);
|
||||
}
|
||||
}
|
||||
if (!bpCurrentDir.delete()) {
|
||||
throw new IOException("Failed to delete " + bpCurrentDir);
|
||||
}
|
||||
for (File f : FileUtil.listFiles(bpDir)) {
|
||||
if (!f.delete()) {
|
||||
throw new IOException("Failed to delete " + f);
|
||||
}
|
||||
}
|
||||
if (!bpDir.delete()) {
|
||||
throw new IOException("Failed to delete " + bpDir);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
String getStorageID() {
|
||||
return storageID;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,172 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
|
||||
class FsVolumeList {
|
||||
/**
|
||||
* Read access to this unmodifiable list is not synchronized.
|
||||
* This list is replaced on modification holding "this" lock.
|
||||
*/
|
||||
volatile List<FsVolumeImpl> volumes = null;
|
||||
|
||||
private final VolumeChoosingPolicy<FsVolumeImpl> blockChooser;
|
||||
private volatile int numFailedVolumes;
|
||||
|
||||
FsVolumeList(List<FsVolumeImpl> volumes, int failedVols,
|
||||
VolumeChoosingPolicy<FsVolumeImpl> blockChooser) {
|
||||
this.volumes = Collections.unmodifiableList(volumes);
|
||||
this.blockChooser = blockChooser;
|
||||
this.numFailedVolumes = failedVols;
|
||||
}
|
||||
|
||||
int numberOfFailedVolumes() {
|
||||
return numFailedVolumes;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get next volume. Synchronized to ensure {@link #curVolume} is updated
|
||||
* by a single thread and next volume is chosen with no concurrent
|
||||
* update to {@link #volumes}.
|
||||
* @param blockSize free space needed on the volume
|
||||
* @return next volume to store the block in.
|
||||
*/
|
||||
synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
|
||||
return blockChooser.chooseVolume(volumes, blockSize);
|
||||
}
|
||||
|
||||
long getDfsUsed() throws IOException {
|
||||
long dfsUsed = 0L;
|
||||
for (FsVolumeImpl v : volumes) {
|
||||
dfsUsed += v.getDfsUsed();
|
||||
}
|
||||
return dfsUsed;
|
||||
}
|
||||
|
||||
long getBlockPoolUsed(String bpid) throws IOException {
|
||||
long dfsUsed = 0L;
|
||||
for (FsVolumeImpl v : volumes) {
|
||||
dfsUsed += v.getBlockPoolUsed(bpid);
|
||||
}
|
||||
return dfsUsed;
|
||||
}
|
||||
|
||||
long getCapacity() {
|
||||
long capacity = 0L;
|
||||
for (FsVolumeImpl v : volumes) {
|
||||
capacity += v.getCapacity();
|
||||
}
|
||||
return capacity;
|
||||
}
|
||||
|
||||
long getRemaining() throws IOException {
|
||||
long remaining = 0L;
|
||||
for (FsVolumeSpi vol : volumes) {
|
||||
remaining += vol.getAvailable();
|
||||
}
|
||||
return remaining;
|
||||
}
|
||||
|
||||
void getVolumeMap(ReplicaMap volumeMap) throws IOException {
|
||||
for (FsVolumeImpl v : volumes) {
|
||||
v.getVolumeMap(volumeMap);
|
||||
}
|
||||
}
|
||||
|
||||
void getVolumeMap(String bpid, ReplicaMap volumeMap) throws IOException {
|
||||
for (FsVolumeImpl v : volumes) {
|
||||
v.getVolumeMap(bpid, volumeMap);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls {@link FsVolumeImpl#checkDirs()} on each volume, removing any
|
||||
* volumes from the active list that result in a DiskErrorException.
|
||||
*
|
||||
* This method is synchronized to allow only one instance of checkDirs()
|
||||
* call
|
||||
* @return list of all the removed volumes.
|
||||
*/
|
||||
synchronized List<FsVolumeImpl> checkDirs() {
|
||||
ArrayList<FsVolumeImpl> removedVols = null;
|
||||
|
||||
// Make a copy of volumes for performing modification
|
||||
final List<FsVolumeImpl> volumeList = new ArrayList<FsVolumeImpl>(volumes);
|
||||
|
||||
for(Iterator<FsVolumeImpl> i = volumeList.iterator(); i.hasNext(); ) {
|
||||
final FsVolumeImpl fsv = i.next();
|
||||
try {
|
||||
fsv.checkDirs();
|
||||
} catch (DiskErrorException e) {
|
||||
FsDatasetImpl.LOG.warn("Removing failed volume " + fsv + ": ",e);
|
||||
if (removedVols == null) {
|
||||
removedVols = new ArrayList<FsVolumeImpl>(1);
|
||||
}
|
||||
removedVols.add(fsv);
|
||||
fsv.shutdown();
|
||||
i.remove(); // Remove the volume
|
||||
numFailedVolumes++;
|
||||
}
|
||||
}
|
||||
|
||||
if (removedVols != null && removedVols.size() > 0) {
|
||||
// Replace volume list
|
||||
volumes = Collections.unmodifiableList(volumeList);
|
||||
FsDatasetImpl.LOG.info("Completed checkDirs. Removed " + removedVols.size()
|
||||
+ " volumes. Current volumes: " + this);
|
||||
}
|
||||
|
||||
return removedVols;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return volumes.toString();
|
||||
}
|
||||
|
||||
|
||||
void addBlockPool(String bpid, Configuration conf) throws IOException {
|
||||
for (FsVolumeImpl v : volumes) {
|
||||
v.addBlockPool(bpid, conf);
|
||||
}
|
||||
}
|
||||
|
||||
void removeBlockPool(String bpid) {
|
||||
for (FsVolumeImpl v : volumes) {
|
||||
v.shutdownBlockPool(bpid);
|
||||
}
|
||||
}
|
||||
|
||||
void shutdown() {
|
||||
for (FsVolumeImpl volume : volumes) {
|
||||
if(volume != null) {
|
||||
volume.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -0,0 +1,228 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.fs.FileUtil;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataStorage;
|
||||
import org.apache.hadoop.util.DiskChecker;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
|
||||
|
||||
/**
|
||||
* A node type that can be built into a tree reflecting the
|
||||
* hierarchy of replicas on the local disk.
|
||||
*/
|
||||
class LDir {
|
||||
final File dir;
|
||||
final int maxBlocksPerDir;
|
||||
|
||||
private int numBlocks = 0;
|
||||
private LDir[] children = null;
|
||||
private int lastChildIdx = 0;
|
||||
|
||||
LDir(File dir, int maxBlocksPerDir) throws IOException {
|
||||
this.dir = dir;
|
||||
this.maxBlocksPerDir = maxBlocksPerDir;
|
||||
|
||||
if (!dir.exists()) {
|
||||
if (!dir.mkdirs()) {
|
||||
throw new IOException("Failed to mkdirs " + dir);
|
||||
}
|
||||
} else {
|
||||
File[] files = FileUtil.listFiles(dir);
|
||||
List<LDir> dirList = new ArrayList<LDir>();
|
||||
for (int idx = 0; idx < files.length; idx++) {
|
||||
if (files[idx].isDirectory()) {
|
||||
dirList.add(new LDir(files[idx], maxBlocksPerDir));
|
||||
} else if (Block.isBlockFilename(files[idx])) {
|
||||
numBlocks++;
|
||||
}
|
||||
}
|
||||
if (dirList.size() > 0) {
|
||||
children = dirList.toArray(new LDir[dirList.size()]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
File addBlock(Block b, File src) throws IOException {
|
||||
//First try without creating subdirectories
|
||||
File file = addBlock(b, src, false, false);
|
||||
return (file != null) ? file : addBlock(b, src, true, true);
|
||||
}
|
||||
|
||||
private File addBlock(Block b, File src, boolean createOk, boolean resetIdx
|
||||
) throws IOException {
|
||||
if (numBlocks < maxBlocksPerDir) {
|
||||
final File dest = FsDatasetImpl.moveBlockFiles(b, src, dir);
|
||||
numBlocks += 1;
|
||||
return dest;
|
||||
}
|
||||
|
||||
if (lastChildIdx < 0 && resetIdx) {
|
||||
//reset so that all children will be checked
|
||||
lastChildIdx = DFSUtil.getRandom().nextInt(children.length);
|
||||
}
|
||||
|
||||
if (lastChildIdx >= 0 && children != null) {
|
||||
//Check if any child-tree has room for a block.
|
||||
for (int i=0; i < children.length; i++) {
|
||||
int idx = (lastChildIdx + i)%children.length;
|
||||
File file = children[idx].addBlock(b, src, false, resetIdx);
|
||||
if (file != null) {
|
||||
lastChildIdx = idx;
|
||||
return file;
|
||||
}
|
||||
}
|
||||
lastChildIdx = -1;
|
||||
}
|
||||
|
||||
if (!createOk) {
|
||||
return null;
|
||||
}
|
||||
|
||||
if (children == null || children.length == 0) {
|
||||
children = new LDir[maxBlocksPerDir];
|
||||
for (int idx = 0; idx < maxBlocksPerDir; idx++) {
|
||||
final File sub = new File(dir, DataStorage.BLOCK_SUBDIR_PREFIX+idx);
|
||||
children[idx] = new LDir(sub, maxBlocksPerDir);
|
||||
}
|
||||
}
|
||||
|
||||
//now pick a child randomly for creating a new set of subdirs.
|
||||
lastChildIdx = DFSUtil.getRandom().nextInt(children.length);
|
||||
return children[ lastChildIdx ].addBlock(b, src, true, false);
|
||||
}
|
||||
|
||||
void getVolumeMap(String bpid, ReplicaMap volumeMap, FsVolumeImpl volume
|
||||
) throws IOException {
|
||||
if (children != null) {
|
||||
for (int i = 0; i < children.length; i++) {
|
||||
children[i].getVolumeMap(bpid, volumeMap, volume);
|
||||
}
|
||||
}
|
||||
|
||||
recoverTempUnlinkedBlock();
|
||||
volume.addToReplicasMap(bpid, volumeMap, dir, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Recover unlinked tmp files on datanode restart. If the original block
|
||||
* does not exist, then the tmp file is renamed to be the
|
||||
* original file name; otherwise the tmp file is deleted.
|
||||
*/
|
||||
private void recoverTempUnlinkedBlock() throws IOException {
|
||||
File files[] = FileUtil.listFiles(dir);
|
||||
for (File file : files) {
|
||||
if (!FsDatasetUtil.isUnlinkTmpFile(file)) {
|
||||
continue;
|
||||
}
|
||||
File blockFile = FsDatasetUtil.getOrigFile(file);
|
||||
if (blockFile.exists()) {
|
||||
// If the original block file still exists, then no recovery is needed.
|
||||
if (!file.delete()) {
|
||||
throw new IOException("Unable to cleanup unlinked tmp file " + file);
|
||||
}
|
||||
} else {
|
||||
if (!file.renameTo(blockFile)) {
|
||||
throw new IOException("Unable to cleanup detached file " + file);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* check if a data diretory is healthy
|
||||
* @throws DiskErrorException
|
||||
*/
|
||||
void checkDirTree() throws DiskErrorException {
|
||||
DiskChecker.checkDir(dir);
|
||||
|
||||
if (children != null) {
|
||||
for (int i = 0; i < children.length; i++) {
|
||||
children[i].checkDirTree();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void clearPath(File f) {
|
||||
String root = dir.getAbsolutePath();
|
||||
String dir = f.getAbsolutePath();
|
||||
if (dir.startsWith(root)) {
|
||||
String[] dirNames = dir.substring(root.length()).
|
||||
split(File.separator + DataStorage.BLOCK_SUBDIR_PREFIX);
|
||||
if (clearPath(f, dirNames, 1))
|
||||
return;
|
||||
}
|
||||
clearPath(f, null, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* dirNames is an array of string integers derived from
|
||||
* usual directory structure data/subdirN/subdirXY/subdirM ...
|
||||
* If dirName array is non-null, we only check the child at
|
||||
* the children[dirNames[idx]]. This avoids iterating over
|
||||
* children in common case. If directory structure changes
|
||||
* in later versions, we need to revisit this.
|
||||
*/
|
||||
private boolean clearPath(File f, String[] dirNames, int idx) {
|
||||
if ((dirNames == null || idx == dirNames.length) &&
|
||||
dir.compareTo(f) == 0) {
|
||||
numBlocks--;
|
||||
return true;
|
||||
}
|
||||
|
||||
if (dirNames != null) {
|
||||
//guess the child index from the directory name
|
||||
if (idx > (dirNames.length - 1) || children == null) {
|
||||
return false;
|
||||
}
|
||||
int childIdx;
|
||||
try {
|
||||
childIdx = Integer.parseInt(dirNames[idx]);
|
||||
} catch (NumberFormatException ignored) {
|
||||
// layout changed? we could print a warning.
|
||||
return false;
|
||||
}
|
||||
return (childIdx >= 0 && childIdx < children.length) ?
|
||||
children[childIdx].clearPath(f, dirNames, idx+1) : false;
|
||||
}
|
||||
|
||||
//guesses failed. back to blind iteration.
|
||||
if (children != null) {
|
||||
for(int i=0; i < children.length; i++) {
|
||||
if (children[i].clearPath(f, null, -1)){
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "FSDir{dir=" + dir + ", children="
|
||||
+ (children == null ? null : Arrays.asList(children)) + "}";
|
||||
}
|
||||
}
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
|
@ -23,11 +23,12 @@ import java.util.Map;
|
|||
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
|
||||
/**
|
||||
* Maintains the replicas map.
|
||||
* Maintains the replica map.
|
||||
*/
|
||||
class ReplicasMap {
|
||||
class ReplicaMap {
|
||||
// Object using which this class is synchronized
|
||||
private final Object mutex;
|
||||
|
||||
|
@ -35,7 +36,7 @@ class ReplicasMap {
|
|||
private Map<String, Map<Long, ReplicaInfo>> map =
|
||||
new HashMap<String, Map<Long, ReplicaInfo>>();
|
||||
|
||||
ReplicasMap(Object mutex) {
|
||||
ReplicaMap(Object mutex) {
|
||||
if (mutex == null) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Object to synchronize on cannot be null");
|
|
@ -0,0 +1,228 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.BufferedReader;
|
||||
import java.io.File;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.FileOutputStream;
|
||||
import java.io.FileReader;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataBlockScanner;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
|
||||
|
||||
class RollingLogsImpl implements RollingLogs {
|
||||
private static final String CURR_SUFFIX = ".curr";
|
||||
private static final String PREV_SUFFIX = ".prev";
|
||||
|
||||
static boolean isFilePresent(String dir, String filePrefix) {
|
||||
return new File(dir, filePrefix + CURR_SUFFIX).exists() ||
|
||||
new File(dir, filePrefix + PREV_SUFFIX).exists();
|
||||
}
|
||||
|
||||
private final File curr;
|
||||
private final File prev;
|
||||
private PrintStream out; //require synchronized access
|
||||
|
||||
private Appender appender = new Appender() {
|
||||
@Override
|
||||
public Appendable append(CharSequence csq) {
|
||||
synchronized(RollingLogsImpl.this) {
|
||||
if (out == null) {
|
||||
throw new IllegalStateException(RollingLogsImpl.this
|
||||
+ " is not yet opened.");
|
||||
}
|
||||
out.print(csq);
|
||||
}
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Appendable append(char c) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public Appendable append(CharSequence csq, int start, int end) {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
synchronized(RollingLogsImpl.this) {
|
||||
if (out != null) {
|
||||
out.close();
|
||||
out = null;
|
||||
}
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
private final AtomicInteger numReaders = new AtomicInteger();
|
||||
|
||||
RollingLogsImpl(String dir, String filePrefix) throws FileNotFoundException{
|
||||
curr = new File(dir, filePrefix + CURR_SUFFIX);
|
||||
prev = new File(dir, filePrefix + PREV_SUFFIX);
|
||||
out = new PrintStream(new FileOutputStream(curr, true));
|
||||
}
|
||||
|
||||
@Override
|
||||
public Reader iterator(boolean skipPrevFile) throws IOException {
|
||||
numReaders.incrementAndGet();
|
||||
return new Reader(skipPrevFile);
|
||||
}
|
||||
|
||||
@Override
|
||||
public Appender appender() {
|
||||
return appender;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean roll() throws IOException {
|
||||
if (numReaders.get() > 0) {
|
||||
return false;
|
||||
}
|
||||
if (!prev.delete() && prev.exists()) {
|
||||
throw new IOException("Failed to delete " + prev);
|
||||
}
|
||||
|
||||
synchronized(this) {
|
||||
appender.close();
|
||||
final boolean renamed = curr.renameTo(prev);
|
||||
out = new PrintStream(new FileOutputStream(curr, true));
|
||||
if (!renamed) {
|
||||
throw new IOException("Failed to rename " + curr + " to " + prev);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return curr.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* This is used to read the lines in order.
|
||||
* If the data is not read completely (i.e, untill hasNext() returns
|
||||
* false), it needs to be explicitly
|
||||
*/
|
||||
private class Reader implements RollingLogs.LineIterator {
|
||||
private File file;
|
||||
private BufferedReader reader;
|
||||
private String line;
|
||||
private boolean closed = false;
|
||||
|
||||
private Reader(boolean skipPrevFile) throws IOException {
|
||||
reader = null;
|
||||
file = skipPrevFile? curr : prev;
|
||||
readNext();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isPrevious() {
|
||||
return file == prev;
|
||||
}
|
||||
|
||||
private boolean openFile() throws IOException {
|
||||
|
||||
for(int i=0; i<2; i++) {
|
||||
if (reader != null || i > 0) {
|
||||
// move to next file
|
||||
file = isPrevious()? curr : null;
|
||||
}
|
||||
if (file == null) {
|
||||
return false;
|
||||
}
|
||||
if (file.exists()) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (reader != null ) {
|
||||
reader.close();
|
||||
reader = null;
|
||||
}
|
||||
|
||||
reader = new BufferedReader(new FileReader(file));
|
||||
return true;
|
||||
}
|
||||
|
||||
// read next line if possible.
|
||||
private void readNext() throws IOException {
|
||||
line = null;
|
||||
try {
|
||||
if (reader != null && (line = reader.readLine()) != null) {
|
||||
return;
|
||||
}
|
||||
if (line == null) {
|
||||
// move to the next file.
|
||||
if (openFile()) {
|
||||
readNext();
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
if (!hasNext()) {
|
||||
close();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean hasNext() {
|
||||
return line != null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String next() {
|
||||
String curLine = line;
|
||||
try {
|
||||
readNext();
|
||||
} catch (IOException e) {
|
||||
DataBlockScanner.LOG.warn("Failed to read next line.", e);
|
||||
}
|
||||
return curLine;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
if (!closed) {
|
||||
try {
|
||||
if (reader != null) {
|
||||
reader.close();
|
||||
}
|
||||
} finally {
|
||||
file = null;
|
||||
reader = null;
|
||||
closed = true;
|
||||
final int n = numReaders.decrementAndGet();
|
||||
assert(n >= 0);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
|
@ -28,7 +28,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.TestInterDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.TestInterDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.namenode.LeaseManager;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
|
||||
|
|
|
@ -22,12 +22,16 @@ package org.apache.hadoop.hdfs.server.datanode;
|
|||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -100,6 +104,19 @@ public class DataNodeTestUtils {
|
|||
return spy;
|
||||
}
|
||||
|
||||
public static InterDatanodeProtocol createInterDatanodeProtocolProxy(
|
||||
DataNode dn, DatanodeID datanodeid, final Configuration conf
|
||||
) throws IOException {
|
||||
return DataNode.createInterDataNodeProtocolProxy(datanodeid, conf,
|
||||
dn.getDnConf().socketTimeout);
|
||||
}
|
||||
|
||||
public static void shutdownBlockScanner(DataNode dn) {
|
||||
if (dn.blockScanner != null) {
|
||||
dn.blockScanner.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used for testing.
|
||||
* Examples are adding and deleting blocks directly.
|
||||
|
@ -111,26 +128,22 @@ public class DataNodeTestUtils {
|
|||
return dn.getFSDataset();
|
||||
}
|
||||
|
||||
public static FSDataset getFsDatasetImpl(DataNode dn) {
|
||||
return (FSDataset)dn.getFSDataset();
|
||||
}
|
||||
|
||||
public static File getFile(DataNode dn, String bpid, long bid) {
|
||||
return getFsDatasetImpl(dn).getFile(bpid, bid);
|
||||
return FsDatasetTestUtil.getFile(dn.getFSDataset(), bpid, bid);
|
||||
}
|
||||
|
||||
public static File getBlockFile(DataNode dn, String bpid, Block b
|
||||
) throws IOException {
|
||||
return getFsDatasetImpl(dn).getBlockFile(bpid, b);
|
||||
return FsDatasetTestUtil.getBlockFile(dn.getFSDataset(), bpid, b);
|
||||
}
|
||||
|
||||
public static boolean unlinkBlock(DataNode dn, ExtendedBlock block, int numLinks
|
||||
public static boolean unlinkBlock(DataNode dn, ExtendedBlock bk, int numLinks
|
||||
) throws IOException {
|
||||
return getFsDatasetImpl(dn).getReplicaInfo(block).unlinkBlock(numLinks);
|
||||
return FsDatasetTestUtil.unlinkBlock(dn.getFSDataset(), bk, numLinks);
|
||||
}
|
||||
|
||||
public static long getPendingAsyncDeletions(DataNode dn) {
|
||||
return getFsDatasetImpl(dn).asyncDiskService.countPendingDeletions();
|
||||
return FsDatasetTestUtil.getPendingAsyncDeletions(dn.getFSDataset());
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -142,6 +155,6 @@ public class DataNodeTestUtils {
|
|||
*/
|
||||
public static ReplicaInfo fetchReplicaInfo(final DataNode dn,
|
||||
final String bpid, final long blkId) {
|
||||
return getFsDatasetImpl(dn).fetchReplicaInfo(bpid, blkId);
|
||||
return FsDatasetTestUtil.fetchReplicaInfo(dn.getFSDataset(), bpid, blkId);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -613,14 +613,13 @@ public class TestBlockReport {
|
|||
|
||||
// Look about specified DN for the replica of the block from 1st DN
|
||||
final DataNode dn1 = cluster.getDataNodes().get(DN_N1);
|
||||
final FSDataset dataset1 = (FSDataset)DataNodeTestUtils.getFSDataset(dn1);
|
||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
Replica r = dataset1.fetchReplicaInfo(bpid, bl.getBlockId());
|
||||
Replica r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId());
|
||||
long start = System.currentTimeMillis();
|
||||
int count = 0;
|
||||
while (r == null) {
|
||||
waitTil(5);
|
||||
r = dataset1.fetchReplicaInfo(bpid, bl.getBlockId());
|
||||
r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId());
|
||||
long waiting_period = System.currentTimeMillis() - start;
|
||||
if (count++ % 100 == 0)
|
||||
if(LOG.isDebugEnabled()) {
|
||||
|
|
|
@ -37,6 +37,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
|
@ -376,7 +377,7 @@ public class TestDataNodeVolumeFailure {
|
|||
new FilenameFilter() {
|
||||
public boolean accept(File dir, String name) {
|
||||
return name.startsWith("blk_") &&
|
||||
name.endsWith(DatanodeUtil.METADATA_EXTENSION);
|
||||
name.endsWith(Block.METADATA_EXTENSION);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
|
|
@ -38,7 +38,9 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||
|
||||
/**
|
||||
* Tests {@link DirectoryScanner} handling of differences
|
||||
|
@ -51,7 +53,7 @@ public class TestDirectoryScanner extends TestCase {
|
|||
|
||||
private MiniDFSCluster cluster;
|
||||
private String bpid;
|
||||
private FSDataset fds = null;
|
||||
private FsDatasetSpi<? extends FsVolumeSpi> fds = null;
|
||||
private DirectoryScanner scanner = null;
|
||||
private Random rand = new Random();
|
||||
private Random r = new Random();
|
||||
|
@ -72,7 +74,7 @@ public class TestDirectoryScanner extends TestCase {
|
|||
/** Truncate a block file */
|
||||
private long truncateBlockFile() throws IOException {
|
||||
synchronized (fds) {
|
||||
for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
|
||||
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
|
||||
File f = b.getBlockFile();
|
||||
File mf = b.getMetaFile();
|
||||
// Truncate a block file that has a corresponding metadata file
|
||||
|
@ -91,7 +93,7 @@ public class TestDirectoryScanner extends TestCase {
|
|||
/** Delete a block file */
|
||||
private long deleteBlockFile() {
|
||||
synchronized(fds) {
|
||||
for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
|
||||
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
|
||||
File f = b.getBlockFile();
|
||||
File mf = b.getMetaFile();
|
||||
// Delete a block file that has corresponding metadata file
|
||||
|
@ -107,7 +109,7 @@ public class TestDirectoryScanner extends TestCase {
|
|||
/** Delete block meta file */
|
||||
private long deleteMetaFile() {
|
||||
synchronized(fds) {
|
||||
for (ReplicaInfo b : fds.volumeMap.replicas(bpid)) {
|
||||
for (ReplicaInfo b : FsDatasetTestUtil.getReplicas(fds, bpid)) {
|
||||
File file = b.getMetaFile();
|
||||
// Delete a metadata file
|
||||
if (file.exists() && file.delete()) {
|
||||
|
@ -124,7 +126,7 @@ public class TestDirectoryScanner extends TestCase {
|
|||
long id = rand.nextLong();
|
||||
while (true) {
|
||||
id = rand.nextLong();
|
||||
if (fds.fetchReplicaInfo(bpid, id) == null) {
|
||||
if (FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, id) == null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -142,7 +144,7 @@ public class TestDirectoryScanner extends TestCase {
|
|||
|
||||
/** Create a block file in a random volume*/
|
||||
private long createBlockFile() throws IOException {
|
||||
List<FSVolume> volumes = fds.getVolumes();
|
||||
List<? extends FsVolumeSpi> volumes = fds.getVolumes();
|
||||
int index = rand.nextInt(volumes.size() - 1);
|
||||
long id = getFreeBlockId();
|
||||
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
|
||||
|
@ -155,7 +157,7 @@ public class TestDirectoryScanner extends TestCase {
|
|||
|
||||
/** Create a metafile in a random volume*/
|
||||
private long createMetaFile() throws IOException {
|
||||
List<FSVolume> volumes = fds.getVolumes();
|
||||
List<? extends FsVolumeSpi> volumes = fds.getVolumes();
|
||||
int index = rand.nextInt(volumes.size() - 1);
|
||||
long id = getFreeBlockId();
|
||||
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
|
||||
|
@ -168,7 +170,7 @@ public class TestDirectoryScanner extends TestCase {
|
|||
|
||||
/** Create block file and corresponding metafile in a rondom volume */
|
||||
private long createBlockMetaFile() throws IOException {
|
||||
List<FSVolume> volumes = fds.getVolumes();
|
||||
List<? extends FsVolumeSpi> volumes = fds.getVolumes();
|
||||
int index = rand.nextInt(volumes.size() - 1);
|
||||
long id = getFreeBlockId();
|
||||
File finalizedDir = volumes.get(index).getFinalizedDir(bpid);
|
||||
|
@ -228,8 +230,7 @@ public class TestDirectoryScanner extends TestCase {
|
|||
try {
|
||||
cluster.waitActive();
|
||||
bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
fds = (FSDataset)DataNodeTestUtils.getFSDataset(
|
||||
cluster.getDataNodes().get(0));
|
||||
fds = DataNodeTestUtils.getFSDataset(cluster.getDataNodes().get(0));
|
||||
CONF.setInt(DFSConfigKeys.DFS_DATANODE_DIRECTORYSCAN_THREADS_KEY,
|
||||
parallelism);
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
|
@ -348,12 +349,13 @@ public class TestDirectoryScanner extends TestCase {
|
|||
|
||||
private void verifyAddition(long blockId, long genStamp, long size) {
|
||||
final ReplicaInfo replicainfo;
|
||||
replicainfo = fds.fetchReplicaInfo(bpid, blockId);
|
||||
replicainfo = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
|
||||
assertNotNull(replicainfo);
|
||||
|
||||
// Added block has the same file as the one created by the test
|
||||
File file = new File(getBlockFile(blockId));
|
||||
assertEquals(file.getName(), fds.getFile(bpid, blockId).getName());
|
||||
assertEquals(file.getName(),
|
||||
FsDatasetTestUtil.getFile(fds, bpid, blockId).getName());
|
||||
|
||||
// Generation stamp is same as that of created file
|
||||
assertEquals(genStamp, replicainfo.getGenerationStamp());
|
||||
|
@ -364,12 +366,12 @@ public class TestDirectoryScanner extends TestCase {
|
|||
|
||||
private void verifyDeletion(long blockId) {
|
||||
// Ensure block does not exist in memory
|
||||
assertNull(fds.fetchReplicaInfo(bpid, blockId));
|
||||
assertNull(FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId));
|
||||
}
|
||||
|
||||
private void verifyGenStamp(long blockId, long genStamp) {
|
||||
final ReplicaInfo memBlock;
|
||||
memBlock = fds.fetchReplicaInfo(bpid, blockId);
|
||||
memBlock = FsDatasetTestUtil.fetchReplicaInfo(fds, bpid, blockId);
|
||||
assertNotNull(memBlock);
|
||||
assertEquals(genStamp, memBlock.getGenerationStamp());
|
||||
}
|
||||
|
|
|
@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
|
|||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetFactory;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
|
||||
/**
|
||||
|
@ -90,7 +91,7 @@ public class TestSimulatedFSDataset extends TestCase {
|
|||
public void testFSDatasetFactory() {
|
||||
final Configuration conf = new Configuration();
|
||||
FsDatasetSpi.Factory<?> f = FsDatasetSpi.Factory.getFactory(conf);
|
||||
assertEquals(FSDataset.Factory.class, f.getClass());
|
||||
assertEquals(FsDatasetFactory.class, f.getClass());
|
||||
assertFalse(f.isSimulated());
|
||||
|
||||
SimulatedFSDataset.setFactory(conf);
|
||||
|
@ -243,7 +244,7 @@ public class TestSimulatedFSDataset extends TestCase {
|
|||
}
|
||||
}
|
||||
|
||||
public void checkInvalidBlock(ExtendedBlock b) throws IOException {
|
||||
public void checkInvalidBlock(ExtendedBlock b) {
|
||||
final SimulatedFSDataset fsdataset = getSimulatedFSDataset();
|
||||
assertFalse(fsdataset.isValidBlock(b));
|
||||
try {
|
||||
|
|
|
@ -36,6 +36,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
|||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseProto;
|
||||
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.FsDatasetTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.log4j.Level;
|
||||
import org.junit.Assert;
|
||||
|
@ -58,8 +59,8 @@ public class TestTransferRbw {
|
|||
}
|
||||
private static ReplicaInPipeline getReplica(final DataNode datanode,
|
||||
final String bpid, final ReplicaState expectedState) throws InterruptedException {
|
||||
final FSDataset dataset = ((FSDataset)datanode.data);
|
||||
final Collection<ReplicaInfo> replicas = dataset.volumeMap.replicas(bpid);
|
||||
final Collection<ReplicaInfo> replicas = FsDatasetTestUtil.getReplicas(
|
||||
datanode.getFSDataset(), bpid);
|
||||
for(int i = 0; i < 5 && replicas.size() == 0; i++) {
|
||||
LOG.info("wait since replicas.size() == 0; i=" + i);
|
||||
Thread.sleep(1000);
|
||||
|
|
|
@ -0,0 +1,59 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
|
||||
public class FsDatasetTestUtil {
|
||||
|
||||
public static File getFile(FsDatasetSpi<?> fsd, String bpid, long bid) {
|
||||
return ((FsDatasetImpl)fsd).getFile(bpid, bid);
|
||||
}
|
||||
|
||||
public static File getBlockFile(FsDatasetSpi<?> fsd, String bpid, Block b
|
||||
) throws IOException {
|
||||
return ((FsDatasetImpl)fsd).getBlockFile(bpid, b);
|
||||
}
|
||||
|
||||
public static boolean unlinkBlock(FsDatasetSpi<?> fsd,
|
||||
ExtendedBlock block, int numLinks) throws IOException {
|
||||
final ReplicaInfo info = ((FsDatasetImpl)fsd).getReplicaInfo(block);
|
||||
return info.unlinkBlock(numLinks);
|
||||
}
|
||||
|
||||
public static ReplicaInfo fetchReplicaInfo (final FsDatasetSpi<?> fsd,
|
||||
final String bpid, final long blockId) {
|
||||
return ((FsDatasetImpl)fsd).fetchReplicaInfo(bpid, blockId);
|
||||
}
|
||||
|
||||
public static long getPendingAsyncDeletions(FsDatasetSpi<?> fsd) {
|
||||
return ((FsDatasetImpl)fsd).asyncDiskService.countPendingDeletions();
|
||||
}
|
||||
|
||||
public static Collection<ReplicaInfo> getReplicas(FsDatasetSpi<?> fsd,
|
||||
String bpid) {
|
||||
return ((FsDatasetImpl)fsd).volumeMap.replicas(bpid);
|
||||
}
|
||||
}
|
|
@ -15,7 +15,7 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.FileInputStream;
|
||||
|
@ -36,7 +36,10 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DatanodeUtil;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.junit.Assert;
|
||||
|
@ -98,8 +101,8 @@ public class TestDatanodeRestart {
|
|||
out.write(writeBuf);
|
||||
out.hflush();
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
for (FsVolumeSpi v : dn.data.getVolumes()) {
|
||||
FSVolume volume = (FSVolume)v;
|
||||
for (FsVolumeSpi v : dataset(dn).getVolumes()) {
|
||||
final FsVolumeImpl volume = (FsVolumeImpl)v;
|
||||
File currentDir = volume.getCurrentDir().getParentFile().getParentFile();
|
||||
File rbwDir = new File(currentDir, "rbw");
|
||||
for (File file : rbwDir.listFiles()) {
|
||||
|
@ -114,7 +117,7 @@ public class TestDatanodeRestart {
|
|||
|
||||
// check volumeMap: one rwr replica
|
||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
ReplicasMap replicas = ((FSDataset)(dn.data)).volumeMap;
|
||||
ReplicaMap replicas = dataset(dn).volumeMap;
|
||||
Assert.assertEquals(1, replicas.size(bpid));
|
||||
ReplicaInfo replica = replicas.replicas(bpid).iterator().next();
|
||||
Assert.assertEquals(ReplicaState.RWR, replica.getState());
|
||||
|
@ -123,7 +126,7 @@ public class TestDatanodeRestart {
|
|||
} else {
|
||||
Assert.assertEquals(fileLen, replica.getNumBytes());
|
||||
}
|
||||
dn.data.invalidate(bpid, new Block[]{replica});
|
||||
dataset(dn).invalidate(bpid, new Block[]{replica});
|
||||
} finally {
|
||||
IOUtils.closeStream(out);
|
||||
if (fs.exists(src)) {
|
||||
|
@ -151,7 +154,7 @@ public class TestDatanodeRestart {
|
|||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
Iterator<ReplicaInfo> replicasItor =
|
||||
((FSDataset)dn.data).volumeMap.replicas(bpid).iterator();
|
||||
dataset(dn).volumeMap.replicas(bpid).iterator();
|
||||
ReplicaInfo replica = replicasItor.next();
|
||||
createUnlinkTmpFile(replica, true, true); // rename block file
|
||||
createUnlinkTmpFile(replica, false, true); // rename meta file
|
||||
|
@ -167,8 +170,7 @@ public class TestDatanodeRestart {
|
|||
dn = cluster.getDataNodes().get(0);
|
||||
|
||||
// check volumeMap: 4 finalized replica
|
||||
Collection<ReplicaInfo> replicas =
|
||||
((FSDataset)(dn.data)).volumeMap.replicas(bpid);
|
||||
Collection<ReplicaInfo> replicas = dataset(dn).volumeMap.replicas(bpid);
|
||||
Assert.assertEquals(4, replicas.size());
|
||||
replicasItor = replicas.iterator();
|
||||
while (replicasItor.hasNext()) {
|
||||
|
@ -180,6 +182,10 @@ public class TestDatanodeRestart {
|
|||
}
|
||||
}
|
||||
|
||||
private static FsDatasetImpl dataset(DataNode dn) {
|
||||
return (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
|
||||
}
|
||||
|
||||
private static void createUnlinkTmpFile(ReplicaInfo replicaInfo,
|
||||
boolean changeBlockFile,
|
||||
boolean isRename) throws IOException {
|
|
@ -15,22 +15,15 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import java.net.SocketTimeoutException;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -41,16 +34,28 @@ import org.apache.hadoop.hdfs.HdfsConfiguration;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
|
||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||
import org.apache.hadoop.io.LongWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
|
@ -103,14 +108,14 @@ public class TestInterDatanodeProtocol {
|
|||
}
|
||||
|
||||
public static void checkMetaInfo(ExtendedBlock b, DataNode dn) throws IOException {
|
||||
Block metainfo = dn.data.getStoredBlock(b.getBlockPoolId(), b.getBlockId());
|
||||
Block metainfo = DataNodeTestUtils.getFSDataset(dn).getStoredBlock(
|
||||
b.getBlockPoolId(), b.getBlockId());
|
||||
Assert.assertEquals(b.getBlockId(), metainfo.getBlockId());
|
||||
Assert.assertEquals(b.getNumBytes(), metainfo.getNumBytes());
|
||||
}
|
||||
|
||||
public static LocatedBlock getLastLocatedBlock(
|
||||
ClientProtocol namenode, String src
|
||||
) throws IOException {
|
||||
ClientProtocol namenode, String src) throws IOException {
|
||||
//get block info for the last block
|
||||
LocatedBlocks locations = namenode.getBlockLocations(src, 0, Long.MAX_VALUE);
|
||||
List<LocatedBlock> blocks = locations.getLocatedBlocks();
|
||||
|
@ -148,13 +153,11 @@ public class TestInterDatanodeProtocol {
|
|||
|
||||
//connect to a data node
|
||||
DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
|
||||
InterDatanodeProtocol idp = DataNode.createInterDataNodeProtocolProxy(
|
||||
datanodeinfo[0], conf, datanode.getDnConf().socketTimeout);
|
||||
InterDatanodeProtocol idp = DataNodeTestUtils.createInterDatanodeProtocolProxy(
|
||||
datanode, datanodeinfo[0], conf);
|
||||
|
||||
//stop block scanner, so we could compare lastScanTime
|
||||
if (datanode.blockScanner != null) {
|
||||
datanode.blockScanner.shutdown();
|
||||
}
|
||||
DataNodeTestUtils.shutdownBlockScanner(datanode);
|
||||
|
||||
//verify BlockMetaDataInfo
|
||||
ExtendedBlock b = locatedblock.getBlock();
|
||||
|
@ -187,14 +190,14 @@ public class TestInterDatanodeProtocol {
|
|||
}
|
||||
|
||||
/** Test
|
||||
* {@link FSDataset#initReplicaRecovery(String, ReplicasMap, Block, long)}
|
||||
* {@link FsDatasetImpl#initReplicaRecovery(String, ReplicaMap, Block, long)}
|
||||
*/
|
||||
@Test
|
||||
public void testInitReplicaRecovery() throws IOException {
|
||||
final long firstblockid = 10000L;
|
||||
final long gs = 7777L;
|
||||
final long length = 22L;
|
||||
final ReplicasMap map = new ReplicasMap(this);
|
||||
final ReplicaMap map = new ReplicaMap(this);
|
||||
String bpid = "BP-TEST";
|
||||
final Block[] blocks = new Block[5];
|
||||
for(int i = 0; i < blocks.length; i++) {
|
||||
|
@ -208,7 +211,8 @@ public class TestInterDatanodeProtocol {
|
|||
final ReplicaInfo originalInfo = map.get(bpid, b);
|
||||
|
||||
final long recoveryid = gs + 1;
|
||||
final ReplicaRecoveryInfo recoveryInfo = FSDataset.initReplicaRecovery(bpid, map, blocks[0], recoveryid);
|
||||
final ReplicaRecoveryInfo recoveryInfo = FsDatasetImpl.initReplicaRecovery(
|
||||
bpid, map, blocks[0], recoveryid);
|
||||
assertEquals(originalInfo, recoveryInfo);
|
||||
|
||||
final ReplicaUnderRecovery updatedInfo = (ReplicaUnderRecovery)map.get(bpid, b);
|
||||
|
@ -217,7 +221,7 @@ public class TestInterDatanodeProtocol {
|
|||
|
||||
//recover one more time
|
||||
final long recoveryid2 = gs + 2;
|
||||
final ReplicaRecoveryInfo recoveryInfo2 = FSDataset.initReplicaRecovery(bpid, map, blocks[0], recoveryid2);
|
||||
final ReplicaRecoveryInfo recoveryInfo2 = FsDatasetImpl.initReplicaRecovery(bpid, map, blocks[0], recoveryid2);
|
||||
assertEquals(originalInfo, recoveryInfo2);
|
||||
|
||||
final ReplicaUnderRecovery updatedInfo2 = (ReplicaUnderRecovery)map.get(bpid, b);
|
||||
|
@ -226,7 +230,7 @@ public class TestInterDatanodeProtocol {
|
|||
|
||||
//case RecoveryInProgressException
|
||||
try {
|
||||
FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
|
||||
FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
|
||||
Assert.fail();
|
||||
}
|
||||
catch(RecoveryInProgressException ripe) {
|
||||
|
@ -237,7 +241,7 @@ public class TestInterDatanodeProtocol {
|
|||
{ // BlockRecoveryFI_01: replica not found
|
||||
final long recoveryid = gs + 1;
|
||||
final Block b = new Block(firstblockid - 1, length, gs);
|
||||
ReplicaRecoveryInfo r = FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
|
||||
ReplicaRecoveryInfo r = FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
|
||||
Assert.assertNull("Data-node should not have this replica.", r);
|
||||
}
|
||||
|
||||
|
@ -245,7 +249,7 @@ public class TestInterDatanodeProtocol {
|
|||
final long recoveryid = gs - 1;
|
||||
final Block b = new Block(firstblockid + 1, length, gs);
|
||||
try {
|
||||
FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
|
||||
FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
|
||||
Assert.fail();
|
||||
}
|
||||
catch(IOException ioe) {
|
||||
|
@ -258,7 +262,7 @@ public class TestInterDatanodeProtocol {
|
|||
final long recoveryid = gs + 1;
|
||||
final Block b = new Block(firstblockid, length, gs+1);
|
||||
try {
|
||||
FSDataset.initReplicaRecovery(bpid, map, b, recoveryid);
|
||||
FsDatasetImpl.initReplicaRecovery(bpid, map, b, recoveryid);
|
||||
fail("InitReplicaRecovery should fail because replica's " +
|
||||
"gs is less than the block's gs");
|
||||
} catch (IOException e) {
|
||||
|
@ -270,7 +274,7 @@ public class TestInterDatanodeProtocol {
|
|||
|
||||
/**
|
||||
* Test for
|
||||
* {@link FSDataset#updateReplicaUnderRecovery(ExtendedBlock, long, long)}
|
||||
* {@link FsDatasetImpl#updateReplicaUnderRecovery(ExtendedBlock, long, long)}
|
||||
* */
|
||||
@Test
|
||||
public void testUpdateReplicaUnderRecovery() throws IOException {
|
||||
|
@ -296,22 +300,22 @@ public class TestInterDatanodeProtocol {
|
|||
//get DataNode and FSDataset objects
|
||||
final DataNode datanode = cluster.getDataNode(datanodeinfo[0].getIpcPort());
|
||||
Assert.assertTrue(datanode != null);
|
||||
Assert.assertTrue(datanode.data instanceof FSDataset);
|
||||
final FSDataset fsdataset = (FSDataset)datanode.data;
|
||||
|
||||
//initReplicaRecovery
|
||||
final ExtendedBlock b = locatedblock.getBlock();
|
||||
final long recoveryid = b.getGenerationStamp() + 1;
|
||||
final long newlength = b.getNumBytes() - 1;
|
||||
final FsDatasetSpi<?> fsdataset = DataNodeTestUtils.getFSDataset(datanode);
|
||||
final ReplicaRecoveryInfo rri = fsdataset.initReplicaRecovery(
|
||||
new RecoveringBlock(b, null, recoveryid));
|
||||
|
||||
//check replica
|
||||
final ReplicaInfo replica = fsdataset.fetchReplicaInfo(bpid, b.getBlockId());
|
||||
final ReplicaInfo replica = FsDatasetTestUtil.fetchReplicaInfo(
|
||||
fsdataset, bpid, b.getBlockId());
|
||||
Assert.assertEquals(ReplicaState.RUR, replica.getState());
|
||||
|
||||
//check meta data before update
|
||||
FSDataset.checkReplicaFiles(replica);
|
||||
FsDatasetImpl.checkReplicaFiles(replica);
|
||||
|
||||
//case "THIS IS NOT SUPPOSED TO HAPPEN"
|
||||
//with (block length) != (stored replica's on disk length).
|
|
@ -15,21 +15,23 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertNull;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.fsdataset.impl.ReplicaMap;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* Unit test for ReplicasMap class
|
||||
*/
|
||||
public class TestReplicasMap {
|
||||
private final ReplicasMap map = new ReplicasMap(TestReplicasMap.class);
|
||||
public class TestReplicaMap {
|
||||
private final ReplicaMap map = new ReplicaMap(TestReplicaMap.class);
|
||||
private final String bpid = "BP-TEST";
|
||||
private final Block block = new Block(1234, 1234, 1234);
|
||||
|
|
@ -15,14 +15,23 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FSDataset.FSVolume;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.datanode.FinalizedReplica;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaAlreadyExistsException;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaBeingWritten;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInPipeline;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaInfo;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaNotFoundException;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaUnderRecovery;
|
||||
import org.apache.hadoop.hdfs.server.datanode.ReplicaWaitingToBeRecovered;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
@ -45,7 +54,7 @@ public class TestWriteToReplica {
|
|||
try {
|
||||
cluster.waitActive();
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
FSDataset dataSet = (FSDataset)dn.data;
|
||||
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
|
||||
|
||||
// set up replicasMap
|
||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
|
@ -66,7 +75,7 @@ public class TestWriteToReplica {
|
|||
try {
|
||||
cluster.waitActive();
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
FSDataset dataSet = (FSDataset)dn.data;
|
||||
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
|
||||
|
||||
// set up replicasMap
|
||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
|
@ -86,7 +95,7 @@ public class TestWriteToReplica {
|
|||
try {
|
||||
cluster.waitActive();
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
FSDataset dataSet = (FSDataset)dn.data;
|
||||
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
|
||||
|
||||
// set up replicasMap
|
||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
|
@ -106,7 +115,7 @@ public class TestWriteToReplica {
|
|||
try {
|
||||
cluster.waitActive();
|
||||
DataNode dn = cluster.getDataNodes().get(0);
|
||||
FSDataset dataSet = (FSDataset)dn.data;
|
||||
FsDatasetImpl dataSet = (FsDatasetImpl)DataNodeTestUtils.getFSDataset(dn);
|
||||
|
||||
// set up replicasMap
|
||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
|
@ -128,7 +137,7 @@ public class TestWriteToReplica {
|
|||
* @return Contrived blocks for further testing.
|
||||
* @throws IOException
|
||||
*/
|
||||
private ExtendedBlock[] setup(String bpid, FSDataset dataSet) throws IOException {
|
||||
private ExtendedBlock[] setup(String bpid, FsDatasetImpl dataSet) throws IOException {
|
||||
// setup replicas map
|
||||
|
||||
ExtendedBlock[] blocks = new ExtendedBlock[] {
|
||||
|
@ -137,8 +146,8 @@ public class TestWriteToReplica {
|
|||
new ExtendedBlock(bpid, 5, 1, 2005), new ExtendedBlock(bpid, 6, 1, 2006)
|
||||
};
|
||||
|
||||
ReplicasMap replicasMap = dataSet.volumeMap;
|
||||
FSVolume vol = dataSet.volumes.getNextVolume(0);
|
||||
ReplicaMap replicasMap = dataSet.volumeMap;
|
||||
FsVolumeImpl vol = dataSet.volumes.getNextVolume(0);
|
||||
ReplicaInfo replicaInfo = new FinalizedReplica(
|
||||
blocks[FINALIZED].getLocalBlock(), vol, vol.getCurrentDir().getParentFile());
|
||||
replicasMap.add(bpid, replicaInfo);
|
||||
|
@ -165,9 +174,9 @@ public class TestWriteToReplica {
|
|||
return blocks;
|
||||
}
|
||||
|
||||
private void testAppend(String bpid, FSDataset dataSet, ExtendedBlock[] blocks) throws IOException {
|
||||
private void testAppend(String bpid, FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
|
||||
long newGS = blocks[FINALIZED].getGenerationStamp()+1;
|
||||
final FSVolume v = (FSVolume)dataSet.volumeMap.get(
|
||||
final FsVolumeImpl v = (FsVolumeImpl)dataSet.volumeMap.get(
|
||||
bpid, blocks[FINALIZED].getLocalBlock()).getVolume();
|
||||
long available = v.getCapacity()-v.getDfsUsed();
|
||||
long expectedLen = blocks[FINALIZED].getNumBytes();
|
||||
|
@ -285,7 +294,7 @@ public class TestWriteToReplica {
|
|||
}
|
||||
}
|
||||
|
||||
private void testClose(FSDataset dataSet, ExtendedBlock [] blocks) throws IOException {
|
||||
private void testClose(FsDatasetImpl dataSet, ExtendedBlock [] blocks) throws IOException {
|
||||
long newGS = blocks[FINALIZED].getGenerationStamp()+1;
|
||||
dataSet.recoverClose(blocks[FINALIZED], newGS,
|
||||
blocks[FINALIZED].getNumBytes()); // successful
|
||||
|
@ -335,7 +344,7 @@ public class TestWriteToReplica {
|
|||
}
|
||||
}
|
||||
|
||||
private void testWriteToRbw(FSDataset dataSet, ExtendedBlock[] blocks) throws IOException {
|
||||
private void testWriteToRbw(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
|
||||
try {
|
||||
dataSet.recoverRbw(blocks[FINALIZED],
|
||||
blocks[FINALIZED].getGenerationStamp()+1,
|
||||
|
@ -428,7 +437,7 @@ public class TestWriteToReplica {
|
|||
dataSet.createRbw(blocks[NON_EXISTENT]);
|
||||
}
|
||||
|
||||
private void testWriteToTemporary(FSDataset dataSet, ExtendedBlock[] blocks) throws IOException {
|
||||
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
|
||||
try {
|
||||
dataSet.createTemporary(blocks[FINALIZED]);
|
||||
Assert.fail("Should not have created a temporary replica that was " +
|
Loading…
Reference in New Issue