HDFS-8188. Erasure coding: refactor client-related code to sync with HDFS-8082 and HDFS-8169. Contributed by Zhe Zhang.

This commit is contained in:
Zhe Zhang 2015-04-20 14:19:12 -07:00 committed by Zhe Zhang
parent dfba46ab57
commit 922631f04f
6 changed files with 54 additions and 35 deletions

View File

@ -177,6 +177,18 @@ public interface HdfsClientConfigKeys {
int THREADPOOL_SIZE_DEFAULT = 0; int THREADPOOL_SIZE_DEFAULT = 0;
} }
/** dfs.client.read.striped configuration properties */
interface StripedRead {
String PREFIX = Read.PREFIX + "striped.";
String THREADPOOL_SIZE_KEY = PREFIX + "threadpool.size";
/**
* With default 6+3 schema, each normal read could span 6 DNs. So this
* default value accommodates 3 read streams
*/
int THREADPOOL_SIZE_DEFAULT = 18;
}
/** dfs.http.client configuration properties */ /** dfs.http.client configuration properties */
interface HttpClient { interface HttpClient {
String PREFIX = "dfs.http.client."; String PREFIX = "dfs.http.client.";

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeStorageInfo;
import java.util.Arrays; import java.util.Arrays;
@ -43,14 +42,6 @@ public class LocatedStripedBlock extends LocatedBlock {
System.arraycopy(indices, 0, blockIndices, 0, indices.length); System.arraycopy(indices, 0, blockIndices, 0, indices.length);
} }
public LocatedStripedBlock(ExtendedBlock b, DatanodeStorageInfo[] storages,
int[] indices, long startOffset, boolean corrupt) {
this(b, DatanodeStorageInfo.toDatanodeInfos(storages),
DatanodeStorageInfo.toStorageIDs(storages),
DatanodeStorageInfo.toStorageTypes(storages), indices,
startOffset, corrupt, EMPTY_LOCS);
}
@Override @Override
public String toString() { public String toString() {
return getClass().getSimpleName() + "{" + getBlock() return getClass().getSimpleName() + "{" + getBlock()

View File

@ -382,21 +382,12 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
dfsClientConf); dfsClientConf);
if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) { if (dfsClientConf.getHedgedReadThreadpoolSize() > 0) {
this.initThreadsNumForHedgedReads(dfsClientConf.getHedgedReadThreadpoolSize()); this.initThreadsNumForHedgedReads(dfsClientConf.
getHedgedReadThreadpoolSize());
} }
numThreads = conf.getInt(
DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE, this.initThreadsNumForStripedReads(dfsClientConf.
DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE); getStripedReadThreadpoolSize());
if (numThreads <= 0) {
LOG.warn("The value of "
+ DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_SIZE
+ " must be greater than 0. The current setting is " + numThreads
+ ". Reset it to the default value "
+ DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE);
numThreads =
DFSConfigKeys.DFS_CLIENT_STRIPED_READ_THREADPOOL_MAX_DEFAULT_SIZE;
}
this.initThreadsNumForStripedReads(numThreads);
this.saslClient = new SaslDataTransferClient( this.saslClient = new SaslDataTransferClient(
conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf),
TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth); TrustedChannelResolver.getInstance(conf), nnFallbackToSimpleAuth);

View File

@ -38,6 +38,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIM
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import com.google.common.base.Preconditions;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.HadoopIllegalArgumentException;
@ -101,6 +102,8 @@ public class DfsClientConf {
private final long hedgedReadThresholdMillis; private final long hedgedReadThresholdMillis;
private final int hedgedReadThreadpoolSize; private final int hedgedReadThreadpoolSize;
private final int stripedReadThreadpoolSize;
public DfsClientConf(Configuration conf) { public DfsClientConf(Configuration conf) {
// The hdfsTimeout is currently the same as the ipc timeout // The hdfsTimeout is currently the same as the ipc timeout
hdfsTimeout = Client.getTimeout(conf); hdfsTimeout = Client.getTimeout(conf);
@ -215,6 +218,13 @@ public class DfsClientConf {
hedgedReadThreadpoolSize = conf.getInt( hedgedReadThreadpoolSize = conf.getInt(
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY, HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_KEY,
HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT); HdfsClientConfigKeys.HedgedRead.THREADPOOL_SIZE_DEFAULT);
stripedReadThreadpoolSize = conf.getInt(
HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY,
HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_DEFAULT);
Preconditions.checkArgument(stripedReadThreadpoolSize > 0, "The value of " +
HdfsClientConfigKeys.StripedRead.THREADPOOL_SIZE_KEY +
" must be greater than 0.");
} }
private DataChecksum.Type getChecksumType(Configuration conf) { private DataChecksum.Type getChecksumType(Configuration conf) {
@ -491,6 +501,13 @@ public class DfsClientConf {
return hedgedReadThreadpoolSize; return hedgedReadThreadpoolSize;
} }
/**
* @return the stripedReadThreadpoolSize
*/
public int getStripedReadThreadpoolSize() {
return stripedReadThreadpoolSize;
}
/** /**
* @return the shortCircuitConf * @return the shortCircuitConf
*/ */

View File

@ -874,7 +874,7 @@ public class BlockManager {
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
blk); blk);
return new LocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos, return newLocatedStripedBlock(eb, storages, uc.getBlockIndices(), pos,
false); false);
} else { } else {
assert blk instanceof BlockInfoContiguousUnderConstruction; assert blk instanceof BlockInfoContiguousUnderConstruction;
@ -883,13 +883,8 @@ public class BlockManager {
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations(); final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(),
blk); blk);
return new LocatedBlock(eb, storages, pos, false); return newLocatedBlock(eb, storages, pos, false);
} }
final BlockInfoContiguousUnderConstruction uc =
(BlockInfoContiguousUnderConstruction) blk;
final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return newLocatedBlock(eb, storages, pos, false);
} }
// get block locations // get block locations
@ -932,7 +927,7 @@ public class BlockManager {
final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk); final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
return blockIndices == null ? return blockIndices == null ?
newLocatedBlock(eb, machines, pos, isCorrupt) : newLocatedBlock(eb, machines, pos, isCorrupt) :
new LocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt); newLocatedStripedBlock(eb, machines, blockIndices, pos, isCorrupt);
} }
/** Create a LocatedBlocks. */ /** Create a LocatedBlocks. */
@ -3920,6 +3915,18 @@ public class BlockManager {
null); null);
} }
public static LocatedStripedBlock newLocatedStripedBlock(
ExtendedBlock b, DatanodeStorageInfo[] storages,
int[] indices, long startOffset, boolean corrupt) {
// startOffset is unknown
return new LocatedStripedBlock(
b, DatanodeStorageInfo.toDatanodeInfos(storages),
DatanodeStorageInfo.toStorageIDs(storages),
DatanodeStorageInfo.toStorageTypes(storages),
indices, startOffset, corrupt,
null);
}
/** /**
* This class is used internally by {@link this#computeRecoveryWorkForBlocks} * This class is used internally by {@link this#computeRecoveryWorkForBlocks}
* to represent a task to recover a block through replication or erasure * to represent a task to recover a block through replication or erasure

View File

@ -28,6 +28,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.fs.permission.PermissionStatus; import org.apache.hadoop.fs.permission.PermissionStatus;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsConstantsClient;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStriped;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite; import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
@ -45,7 +46,7 @@ public class TestStripedINodeFile {
"userName", null, FsPermission.getDefault()); "userName", null, FsPermission.getDefault());
private static INodeFile createStripedINodeFile() { private static INodeFile createStripedINodeFile() {
return new INodeFile(INodeId.GRANDFATHER_INODE_ID, null, perm, 0L, 0L, return new INodeFile(HdfsConstantsClient.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
null, (short)0, 1024L, HdfsConstants.COLD_STORAGE_POLICY_ID); null, (short)0, 1024L, HdfsConstants.COLD_STORAGE_POLICY_ID);
} }