HDFS-5542. Fix TODO and clean up the code in HDFS-2832. (Contributed by szetszwo)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1544664 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-11-22 20:07:10 +00:00
parent b33ff7b5f2
commit 97acde2d33
42 changed files with 324 additions and 314 deletions

View File

@ -115,3 +115,6 @@ IMPROVEMENTS:
HDFS-5547. Fix build break after merge from trunk to HDFS-2832. (Arpit
Agarwal)
HDFS-5542. Fix TODO and clean up the code in HDFS-2832. (Contributed by
szetszwo)

View File

@ -24,22 +24,12 @@ import org.apache.hadoop.classification.InterfaceStability;
/**
* Defines the types of supported storage media. The default storage
* medium is assumed to be DISK.
*
*/
@InterfaceAudience.Public
@InterfaceStability.Evolving
@InterfaceStability.Unstable
public enum StorageType {
DISK(1),
SSD(2);
DISK,
SSD;
public static StorageType DEFAULT = DISK;
private final int storageType;
StorageType(int medium) {
storageType = medium;
}
public int getStorageType() {
return this.storageType;
}
}

View File

@ -21,6 +21,8 @@ package org.apache.hadoop.hdfs.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import com.google.common.annotations.VisibleForTesting;
/**
* This class represents the primary identifier for a Datanode.
* Datanodes are identified by how they can be contacted (hostname
@ -45,9 +47,11 @@ public class DatanodeID implements Comparable<DatanodeID> {
private int infoSecurePort; // info server port
private int ipcPort; // IPC server port
// UUID identifying a given datanode. For upgraded Datanodes this is the
// same as the StorageID that was previously used by this Datanode. For
// newly formatted Datanodes it is a UUID.
/**
* UUID identifying a given datanode. For upgraded Datanodes this is the
* same as the StorageID that was previously used by this Datanode.
* For newly formatted Datanodes it is a UUID.
*/
private String datanodeUuid = null;
public DatanodeID(DatanodeID from) {
@ -99,7 +103,8 @@ public class DatanodeID implements Comparable<DatanodeID> {
return datanodeUuid;
}
public void setDatanodeUuid(String datanodeUuid) {
@VisibleForTesting
public void setDatanodeUuidForTesting(String datanodeUuid) {
this.datanodeUuid = datanodeUuid;
}

View File

@ -124,13 +124,13 @@ public class DatanodeInfo extends DatanodeID implements Node {
/** Constructor */
public DatanodeInfo(final String ipAddr, final String hostName,
final String DatanodeUuid, final int xferPort, final int infoPort,
final String datanodeUuid, final int xferPort, final int infoPort,
final int infoSecurePort, final int ipcPort,
final long capacity, final long dfsUsed, final long remaining,
final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
final long lastUpdate, final int xceiverCount,
final String networkLocation, final AdminStates adminState) {
super(ipAddr, hostName, DatanodeUuid, xferPort, infoPort,
super(ipAddr, hostName, datanodeUuid, xferPort, infoPort,
infoSecurePort, ipcPort);
this.capacity = capacity;
this.dfsUsed = dfsUsed;

View File

@ -123,10 +123,6 @@ public class LocatedBlock {
return locs;
}
public void setStorageTypes(StorageType[] storageTypes) {
this.storageTypes = storageTypes;
}
public StorageType[] getStorageTypes() {
return storageTypes;
}

View File

@ -614,7 +614,7 @@ public class PBHelper {
}
final String[] storageIDs = b.getStorageIDs();
if (storageIDs != null) {
builder.addAllStorageIDs(Arrays.asList(b.getStorageIDs()));
builder.addAllStorageIDs(Arrays.asList(storageIDs));
}
return builder.setB(PBHelper.convert(b.getBlock()))
@ -630,22 +630,23 @@ public class PBHelper {
targets[i] = PBHelper.convert(locs.get(i));
}
List<StorageTypeProto> storageTypesList = proto.getStorageTypesList();
StorageType[] storageTypes = new StorageType[locs.size()];
// The media should correspond to targets 1:1. If not then
// ignore the media information (left as default).
if ((storageTypesList != null) &&
(storageTypesList.size() == locs.size())) {
for (int i = 0; i < storageTypesList.size(); ++i) {
storageTypes[i] = PBHelper.convertType(storageTypesList.get(i));
}
final int storageTypesCount = proto.getStorageTypesCount();
final StorageType[] storageTypes;
if (storageTypesCount == 0) {
storageTypes = null;
} else {
Preconditions.checkState(storageTypesCount == locs.size());
storageTypes = convertStorageTypeProtos(proto.getStorageTypesList());
}
final int storageIDsCount = proto.getStorageIDsCount();
final String[] storageIDs = storageIDsCount == 0? null
: proto.getStorageIDsList().toArray(new String[storageIDsCount]);
final String[] storageIDs;
if (storageIDsCount == 0) {
storageIDs = null;
} else {
Preconditions.checkState(storageIDsCount == locs.size());
storageIDs = proto.getStorageIDsList().toArray(new String[storageIDsCount]);
}
// Set values from the isCached list, re-using references from loc
List<DatanodeInfo> cachedLocs = new ArrayList<DatanodeInfo>(locs.size());
@ -1498,23 +1499,15 @@ public class PBHelper {
case SSD:
return StorageTypeProto.SSD;
default:
Preconditions.checkState(
false,
"Failed to update StorageTypeProto with new StorageType " +
type.toString());
return StorageTypeProto.DISK;
throw new IllegalStateException(
"BUG: StorageType not found, type=" + type);
}
}
public static DatanodeStorage convert(DatanodeStorageProto s) {
if (s.hasStorageType()) {
return new DatanodeStorage(s.getStorageUuid(),
PBHelper.convertState(s.getState()),
PBHelper.convertType(s.getStorageType()));
} else {
return new DatanodeStorage(s.getStorageUuid(),
PBHelper.convertState(s.getState()));
}
return new DatanodeStorage(s.getStorageUuid(),
PBHelper.convertState(s.getState()),
PBHelper.convertType(s.getStorageType()));
}
private static State convertState(StorageState state) {
@ -1534,10 +1527,20 @@ public class PBHelper {
case SSD:
return StorageType.SSD;
default:
return StorageType.DEFAULT;
throw new IllegalStateException(
"BUG: StorageTypeProto not found, type=" + type);
}
}
private static StorageType[] convertStorageTypeProtos(
List<StorageTypeProto> storageTypesList) {
final StorageType[] storageTypes = new StorageType[storageTypesList.size()];
for (int i = 0; i < storageTypes.length; ++i) {
storageTypes[i] = PBHelper.convertType(storageTypesList.get(i));
}
return storageTypes;
}
public static StorageReportProto convert(StorageReport r) {
StorageReportProto.Builder builder = StorageReportProto.newBuilder()
.setBlockPoolUsed(r.getBlockPoolUsed()).setCapacity(r.getCapacity())

View File

@ -115,7 +115,7 @@ class BlocksMap {
/**
* Searches for the block in the BlocksMap and
* returns {@link Iterable} that iterates through the nodes the block belongs to.
* returns {@link Iterable} of the storages the block belongs to.
*/
Iterable<DatanodeStorageInfo> getStorages(Block b) {
return getStorages(blocks.get(b));
@ -123,7 +123,7 @@ class BlocksMap {
/**
* For a block that has already been retrieved from the BlocksMap
* returns {@link Iterable} that iterates through the nodes the block belongs to.
* returns {@link Iterable} of the storages the block belongs to.
*/
Iterable<DatanodeStorageInfo> getStorages(final BlockInfo storedBlock) {
return new Iterable<DatanodeStorageInfo>() {

View File

@ -41,8 +41,6 @@ import org.apache.hadoop.hdfs.util.LightWeightHashSet;
import org.apache.hadoop.util.IntrusiveCollection;
import org.apache.hadoop.util.Time;
import com.google.common.annotations.VisibleForTesting;
/**
* This class extends the DatanodeInfo class with ephemeral information (eg
* health, capacity, what blocks are associated with the Datanode) that is
@ -176,15 +174,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
*/
private long lastCachingDirectiveSentTimeMs;
/**
* Head of the list of blocks on the datanode
*/
private volatile BlockInfo blockList = null;
/**
* Number of blocks on the datanode
*/
private int numBlocks = 0;
// isAlive == heartbeats.contains(this)
// This is an optimization, because contains takes O(n) time on Arraylist
public boolean isAlive = false;
@ -661,8 +650,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
return sb.toString();
}
@VisibleForTesting
public DatanodeStorageInfo updateStorage(DatanodeStorage s) {
DatanodeStorageInfo updateStorage(DatanodeStorage s) {
synchronized (storageMap) {
DatanodeStorageInfo storage = storageMap.get(s.getStorageID());
if (storage == null) {
@ -670,8 +658,6 @@ public class DatanodeDescriptor extends DatanodeInfo {
" for DN " + getXferAddr());
storage = new DatanodeStorageInfo(this, s);
storageMap.put(s.getStorageID(), storage);
} else {
storage.setState(s.getState());
}
return storage;
}

View File

@ -99,7 +99,7 @@ public class DatanodeStorageInfo {
private final DatanodeDescriptor dn;
private final String storageID;
private final StorageType storageType;
private State state;
private final State state;
private long capacity;
private long dfsUsed;
@ -128,7 +128,7 @@ public class DatanodeStorageInfo {
*/
private boolean blockContentsStale = true;
public DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) {
DatanodeStorageInfo(DatanodeDescriptor dn, DatanodeStorage s) {
this.dn = dn;
this.storageID = s.getStorageID();
this.storageType = s.getStorageType();
@ -165,7 +165,7 @@ public class DatanodeStorageInfo {
}
@VisibleForTesting
public void setUtilization(long capacity, long dfsUsed,
public void setUtilizationForTesting(long capacity, long dfsUsed,
long remaining, long blockPoolUsed) {
this.capacity = capacity;
this.dfsUsed = dfsUsed;
@ -173,41 +173,35 @@ public class DatanodeStorageInfo {
this.blockPoolUsed = blockPoolUsed;
}
public void setState(State s) {
this.state = s;
// TODO: if goes to failed state cleanup the block list
}
public State getState() {
State getState() {
return this.state;
}
public String getStorageID() {
String getStorageID() {
return storageID;
}
public StorageType getStorageType() {
StorageType getStorageType() {
return storageType;
}
public long getCapacity() {
long getCapacity() {
return capacity;
}
public long getDfsUsed() {
long getDfsUsed() {
return dfsUsed;
}
public long getRemaining() {
long getRemaining() {
return remaining;
}
public long getBlockPoolUsed() {
long getBlockPoolUsed() {
return blockPoolUsed;
}
public boolean addBlock(BlockInfo b) {
boolean addBlock(BlockInfo b) {
if(!b.addStorage(this))
return false;
// add to the head of the data-node list
@ -216,7 +210,7 @@ public class DatanodeStorageInfo {
return true;
}
public boolean removeBlock(BlockInfo b) {
boolean removeBlock(BlockInfo b) {
blockList = b.listRemove(blockList, this);
if (b.removeStorage(this)) {
numBlocks--;
@ -226,7 +220,7 @@ public class DatanodeStorageInfo {
}
}
public int numBlocks() {
int numBlocks() {
return numBlocks;
}
@ -249,11 +243,11 @@ public class DatanodeStorageInfo {
* @return the head of the blockList
*/
@VisibleForTesting
protected BlockInfo getHead(){
BlockInfo getBlockListHeadForTesting(){
return blockList;
}
public void updateState(StorageReport r) {
void updateState(StorageReport r) {
capacity = r.getCapacity();
dfsUsed = r.getDfsUsed();
remaining = r.getRemaining();

View File

@ -198,8 +198,8 @@ class PendingReplicationBlocks {
}
}
void decrementReplicas(DatanodeDescriptor target) {
targets.remove(target);
void decrementReplicas(DatanodeDescriptor dn) {
targets.remove(dn);
}
int getNumReplicas() {

View File

@ -237,8 +237,7 @@ public abstract class Storage extends StorageInfo {
final StorageDirType dirType; // storage dir type
FileLock lock; // storage lock
//TODO HDFS-2832: Consider moving this out of StorageDirectory.
String storageUuid = null; // Storage directory identifier.
private String storageUuid = null; // Storage directory identifier.
public StorageDirectory(File dir) {
// default dirType is null

View File

@ -331,8 +331,7 @@ class BPOfferService {
}
}
synchronized DatanodeRegistration createRegistration()
throws IOException {
synchronized DatanodeRegistration createRegistration() throws IOException {
Preconditions.checkState(bpNSInfo != null,
"getRegistration() can only be called after initial handshake");
return dn.createBPRegistration(bpNSInfo);

View File

@ -251,10 +251,7 @@ class BPServiceActor implements Runnable {
DatanodeInfo[] dnArr = { new DatanodeInfo(bpRegistration) };
String[] uuids = { storageUuid };
StorageType[] types = { storageType };
// TODO: Corrupt flag is set to false for compatibility. We can probably
// set it to true here.
LocatedBlock[] blocks = {
new LocatedBlock(block, dnArr, uuids, types) };
LocatedBlock[] blocks = { new LocatedBlock(block, dnArr, uuids, types) };
try {
bpNamenode.reportBadBlocks(blocks);

View File

@ -255,8 +255,8 @@ class BlockReceiver implements Closeable {
/** Return the datanode object. */
DataNode getDataNode() {return datanode;}
public Replica getReplicaInfo() {
return replicaInfo;
String getStorageUuid() {
return replicaInfo.getStorageUuid();
}
/**

View File

@ -18,10 +18,40 @@
package org.apache.hadoop.hdfs.server.datanode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.util.ExitUtil.terminate;
import java.io.BufferedOutputStream;
import java.io.ByteArrayInputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.PrintStream;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.net.UnknownHostException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.SocketChannel;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -38,15 +68,37 @@ import org.apache.hadoop.hdfs.HDFSPolicyProvider;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.net.DomainPeerServer;
import org.apache.hadoop.hdfs.net.TcpPeerServer;
import org.apache.hadoop.hdfs.protocol.*;
import org.apache.hadoop.hdfs.protocol.datatransfer.*;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockLocalPathInfo;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsBlocksMetadata;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocol.datatransfer.BlockConstructionStage;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferEncryptor;
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
import org.apache.hadoop.hdfs.protocol.datatransfer.IOStreamPair;
import org.apache.hadoop.hdfs.protocol.datatransfer.Sender;
import org.apache.hadoop.hdfs.protocol.proto.ClientDatanodeProtocolProtos.ClientDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.DNTransferAckProto;
import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InterDatanodeProtocolService;
import org.apache.hadoop.hdfs.protocolPB.*;
import org.apache.hadoop.hdfs.security.token.block.*;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.ClientDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.InterDatanodeProtocolTranslatorPB;
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.security.token.block.BlockPoolTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.StartupOption;
@ -59,8 +111,12 @@ import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics;
import org.apache.hadoop.hdfs.server.datanode.web.resources.DatanodeWebHdfsMethods;
import org.apache.hadoop.hdfs.server.namenode.FileChecksumServlets;
import org.apache.hadoop.hdfs.server.namenode.StreamFile;
import org.apache.hadoop.hdfs.server.protocol.*;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.web.WebHdfsFileSystem;
import org.apache.hadoop.hdfs.web.resources.Param;
import org.apache.hadoop.http.HttpServer;
@ -82,23 +138,21 @@ import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.*;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DiskChecker;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
import org.apache.hadoop.util.GenericOptionsParser;
import org.apache.hadoop.util.JvmPauseMonitor;
import org.apache.hadoop.util.ServicePlugin;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.VersionInfo;
import org.mortbay.util.ajax.JSON;
import java.io.*;
import java.net.*;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.SocketChannel;
import java.security.PrivilegedExceptionAction;
import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import javax.management.ObjectName;
import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
import static org.apache.hadoop.util.ExitUtil.terminate;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.protobuf.BlockingService;
/**********************************************************
* DataNode is a class (and program) that stores a set of
@ -204,7 +258,7 @@ public class DataNode extends Configured
private JvmPauseMonitor pauseMonitor;
private SecureResources secureResources = null;
private AbstractList<StorageLocation> dataDirs;
private List<StorageLocation> dataDirs;
private Configuration conf;
private final long maxNumberOfBlocksToLog;
@ -219,7 +273,7 @@ public class DataNode extends Configured
* and a namenode proxy
*/
DataNode(final Configuration conf,
final AbstractList<StorageLocation> dataDirs,
final List<StorageLocation> dataDirs,
final SecureResources resources) throws IOException {
super(conf);
this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,
@ -640,7 +694,7 @@ public class DataNode extends Configured
* @throws IOException
*/
void startDataNode(Configuration conf,
AbstractList<StorageLocation> dataDirs,
List<StorageLocation> dataDirs,
// DatanodeProtocol namenode,
SecureResources resources
) throws IOException {
@ -915,7 +969,7 @@ public class DataNode extends Configured
/**
* NB: The datanode can perform data transfer on the streaming
* address however clients are given the IPC IP address for data
* transfer, and that may be a different address.
* transfer, and that may be a different address.
*
* @return socket address for data transfer
*/
@ -1660,21 +1714,19 @@ public class DataNode extends Configured
return makeInstance(dataLocations, conf, resources);
}
static Collection<StorageLocation> parseStorageLocations(
Collection<String> rawLocations) {
public static List<StorageLocation> getStorageLocations(Configuration conf) {
Collection<String> rawLocations =
conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY);
List<StorageLocation> locations =
new ArrayList<StorageLocation>(rawLocations.size());
for(String locationString : rawLocations) {
StorageLocation location;
final StorageLocation location;
try {
location = StorageLocation.parse(locationString);
} catch (IOException ioe) {
LOG.error("Failed to parse storage location " + locationString);
continue;
} catch (IllegalArgumentException iae) {
LOG.error(iae.toString());
continue;
throw new IllegalArgumentException("Failed to parse conf property "
+ DFS_DATANODE_DATA_DIR_KEY + ": " + locationString, ioe);
}
locations.add(location);
@ -1683,12 +1735,6 @@ public class DataNode extends Configured
return locations;
}
public static Collection<StorageLocation> getStorageLocations(
Configuration conf) {
return parseStorageLocations(
conf.getTrimmedStringCollection(DFS_DATANODE_DATA_DIR_KEY));
}
/** Instantiate & Start a single datanode daemon and wait for it to finish.
* If this thread is specifically interrupted, it will stop waiting.
*/
@ -1760,7 +1806,7 @@ public class DataNode extends Configured
DFS_DATANODE_DATA_DIR_PERMISSION_DEFAULT));
DataNodeDiskChecker dataNodeDiskChecker =
new DataNodeDiskChecker(permission);
ArrayList<StorageLocation> locations =
List<StorageLocation> locations =
checkStorageLocations(dataDirs, localFS, dataNodeDiskChecker);
DefaultMetricsSystem.initialize("DataNode");
@ -1769,20 +1815,21 @@ public class DataNode extends Configured
}
// DataNode ctor expects AbstractList instead of List or Collection...
static ArrayList<StorageLocation> checkStorageLocations(
static List<StorageLocation> checkStorageLocations(
Collection<StorageLocation> dataDirs,
LocalFileSystem localFS, DataNodeDiskChecker dataNodeDiskChecker)
throws IOException {
ArrayList<StorageLocation> locations = new ArrayList<StorageLocation>();
StringBuilder invalidDirs = new StringBuilder();
for (StorageLocation location : dataDirs) {
final URI uri = location.getUri();
try {
dataNodeDiskChecker.checkDir(localFS, new Path(location.getUri()));
dataNodeDiskChecker.checkDir(localFS, new Path(uri));
locations.add(location);
} catch (IOException ioe) {
LOG.warn("Invalid " + DFS_DATANODE_DATA_DIR_KEY + " "
+ location.getFile() + " : ", ioe);
invalidDirs.append("\"").append(location.getUri().getPath()).append("\" ");
invalidDirs.append("\"").append(uri.getPath()).append("\" ");
}
}
if (locations.size() == 0) {

View File

@ -105,7 +105,7 @@ public class DataStorage extends Storage {
/** Create an ID for this storage. */
public synchronized void createStorageID(StorageDirectory sd) {
if (sd.getStorageUuid() == null) {
sd.setStorageUuid(DatanodeStorage.newStorageID());
sd.setStorageUuid(DatanodeStorage.generateUuid());
}
}
@ -215,8 +215,8 @@ public class DataStorage extends Storage {
// Create list of storage directories for the block pool
Collection<File> bpDataDirs = new ArrayList<File>();
for(Iterator<StorageLocation> it = dataDirs.iterator(); it.hasNext();) {
File dnRoot = it.next().getFile();
for(StorageLocation dir : dataDirs) {
File dnRoot = dir.getFile();
File bpRoot = BlockPoolSliceStorage.getBpRoot(bpID, new File(dnRoot,
STORAGE_DIR_CURRENT));
bpDataDirs.add(bpRoot);
@ -269,7 +269,7 @@ public class DataStorage extends Storage {
if (sd.getStorageUuid() == null) {
// Assign a new Storage UUID.
sd.setStorageUuid(UUID.randomUUID().toString());
sd.setStorageUuid(DatanodeStorage.generateUuid());
}
writeProperties(sd);
@ -305,8 +305,7 @@ public class DataStorage extends Storage {
/*
* Read ClusterID, StorageID, StorageType, CTime from
* DataStorage VERSION file and verify them.
* Always called just after reading the properties from the VERSION
* file.
* Always called just after reading the properties from the VERSION file.
*/
@Override
protected void setFieldsFromProperties(Properties props, StorageDirectory sd)

View File

@ -447,7 +447,7 @@ class DataXceiver extends Receiver implements Runnable {
String mirrorNode = null; // the name:port of next target
String firstBadLink = ""; // first datanode that failed in connection setup
Status mirrorInStatus = SUCCESS;
Replica replica;
final String storageUuid;
try {
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
@ -458,10 +458,10 @@ class DataXceiver extends Receiver implements Runnable {
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy);
replica = blockReceiver.getReplicaInfo();
storageUuid = blockReceiver.getStorageUuid();
} else {
replica =
datanode.data.recoverClose(block, latestGenerationStamp, minBytesRcvd);
storageUuid = datanode.data.recoverClose(
block, latestGenerationStamp, minBytesRcvd);
}
//
@ -593,8 +593,7 @@ class DataXceiver extends Receiver implements Runnable {
// the block is finalized in the PacketResponder.
if (isDatanode ||
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
datanode.closeBlock(
block, DataNode.EMPTY_DEL_HINT, replica.getStorageUuid());
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT, storageUuid);
LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
+ localAddress + " of size " + block.getNumBytes());
}
@ -864,7 +863,7 @@ class DataXceiver extends Receiver implements Runnable {
// notify name node
datanode.notifyNamenodeReceivedBlock(
block, delHint, blockReceiver.getReplicaInfo().getStorageUuid());
block, delHint, blockReceiver.getStorageUuid());
LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()
+ ", delHint=" + delHint);

View File

@ -19,7 +19,6 @@ package org.apache.hadoop.hdfs.server.datanode;
import java.io.File;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.LinkedList;

View File

@ -61,10 +61,6 @@ public class FinalizedReplica extends ReplicaInfo {
this.unlinked = from.isUnlinked();
}
public FinalizedReplica(ReplicaInfo replicaInfo) {
super(replicaInfo);
}
@Override // ReplicaInfo
public ReplicaState getState() {
return ReplicaState.FINALIZED;

View File

@ -18,18 +18,16 @@
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.server.common.Util;
import java.util.regex.Pattern;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.regex.Matcher;
import static java.util.regex.Pattern.compile;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.server.common.Util;
/**
* Encapsulates the URI and storage medium that together describe a
@ -39,19 +37,15 @@ import static java.util.regex.Pattern.compile;
*/
@InterfaceAudience.Private
public class StorageLocation {
public static final Log LOG = LogFactory.getLog(StorageLocation.class);
final StorageType storageType;
final File file;
// Regular expression that describes a storage uri with a storage type.
// e.g. [Disk]/storages/storage1/
private static final String rawStringRegex = "^\\[(\\w*)\\](.+)$";
/** Regular expression that describes a storage uri with a storage type.
* e.g. [Disk]/storages/storage1/
*/
private static final Pattern regex = Pattern.compile("^\\[(\\w*)\\](.+)$");
StorageLocation(URI uri) {
this(StorageType.DISK, uri);
}
StorageLocation(StorageType storageType, URI uri) {
private StorageLocation(StorageType storageType, URI uri) {
this.storageType = storageType;
if (uri.getScheme() == null ||
@ -59,8 +53,7 @@ public class StorageLocation {
// drop any (illegal) authority in the URI for backwards compatibility
this.file = new File(uri.getPath());
} else {
throw new IllegalArgumentException(
"Got an Unsupported URI schema in " + uri + ". Ignoring ...");
throw new IllegalArgumentException("Unsupported URI schema in " + uri);
}
}
@ -68,7 +61,7 @@ public class StorageLocation {
return this.storageType;
}
public URI getUri() {
URI getUri() {
return file.toURI();
}
@ -85,29 +78,24 @@ public class StorageLocation {
* @return A StorageLocation object if successfully parsed, null otherwise.
* Does not throw any exceptions.
*/
public static StorageLocation parse(String rawLocation) throws IOException {
Matcher matcher = compile(rawStringRegex).matcher(rawLocation);
StorageType storageType = StorageType.DISK;
static StorageLocation parse(String rawLocation) throws IOException {
Matcher matcher = regex.matcher(rawLocation);
StorageType storageType = StorageType.DEFAULT;
String location = rawLocation;
if (matcher.matches()) {
String classString = matcher.group(1);
location = matcher.group(2);
if (!classString.isEmpty()) {
try {
storageType = StorageType.valueOf(classString.toUpperCase());
} catch (RuntimeException re) {
LOG.error("Unable to parse storage type: " + re.toString() +
". Using the default storage type for directory " +
location);
}
storageType = StorageType.valueOf(classString.toUpperCase());
}
}
return new StorageLocation(storageType, Util.stringAsURI(location));
}
@Override
public String toString() {
return "[" + storageType.toString() + "]" + file.toURI().toString();
return "[" + storageType + "]" + file.toURI();
}
}

View File

@ -242,9 +242,10 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @param b block
* @param newGS the new generation stamp for the replica
* @param expectedBlockLen the number of bytes the replica is expected to have
* @return the storage uuid of the replica.
* @throws IOException
*/
public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen
) throws IOException;
/**
@ -264,15 +265,6 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
*/
public void unfinalizeBlock(ExtendedBlock b) throws IOException;
/**
* TODO HDFS-2832: Deprecate this when we fix tests.
* Returns the block report - the full list of blocks stored under a
* block pool
* @param bpid Block Pool Id
* @return - the block report - the full list of blocks stored
*/
public BlockListAsLongs getBlockReport(String bpid);
/**
* Returns one block report per volume.
* @param bpid Block Pool Id

View File

@ -197,8 +197,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
private final int validVolsRequired;
// TODO HDFS-2832: Consider removing duplicated block info from these
// two maps. This might require some refactoring
// rewrite of FsDatasetImpl.
// two maps and move the perVolumeReplicaMap to FsVolumeImpl.
// This might require some refactoring.
final ReplicaMap volumeMap;
// Map from StorageID to ReplicaMap.
@ -726,7 +726,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
@Override // FsDatasetSpi
public Replica recoverClose(ExtendedBlock b, long newGS,
public String recoverClose(ExtendedBlock b, long newGS,
long expectedBlockLen) throws IOException {
LOG.info("Recover failed close " + b);
// check replica's state
@ -737,7 +737,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
if (replicaInfo.getState() == ReplicaState.RBW) {
finalizeReplica(b.getBlockPoolId(), replicaInfo);
}
return replicaInfo;
return replicaInfo.getStorageUuid();
}
/**
@ -1083,14 +1083,6 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
return cacheManager.getCachedBlocks(bpid);
}
/**
* Generates a block report from the in-memory block map.
*/
@Override // FsDatasetSpi
public BlockListAsLongs getBlockReport(String bpid) {
return getBlockReportWithReplicaMap(bpid, volumeMap);
}
@Override
public Map<String, BlockListAsLongs> getBlockReports(String bpid) {
Map<String, BlockListAsLongs> blockReportMap =
@ -1114,7 +1106,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
new ArrayList<FinalizedReplica>(volumeMap.size(bpid));
for (ReplicaInfo b : volumeMap.replicas(bpid)) {
if(b.getState() == ReplicaState.FINALIZED) {
finalized.add(new FinalizedReplica(b));
finalized.add(new FinalizedReplica((FinalizedReplica)b));
}
}
return finalized;

View File

@ -53,7 +53,7 @@ class FsVolumeList {
* @param blockSize free space needed on the volume
* @return next volume to store the block in.
*/
// TODO this will be replaced by getting volume from StorageID directly later.
// TODO should choose volume with storage type
synchronized FsVolumeImpl getNextVolume(long blockSize) throws IOException {
return blockChooser.chooseVolume(volumes, blockSize);
}

View File

@ -119,9 +119,7 @@ class ReplicaMap {
}
/**
* Add all entries from the given replica map into the local replica
* map.
* @param
* Add all entries from the given replica map into the local replica map.
*/
void addAll(ReplicaMap other) {
map.putAll(other.map);

View File

@ -36,8 +36,7 @@ public class DatanodeStorage {
private final StorageType storageType;
/**
* Create a storage with {@link State#NORMAL} and
* {@link org.apache.hadoop.hdfs.StorageType#DEFAULT}.
* Create a storage with {@link State#NORMAL} and {@link StorageType#DEFAULT}.
*
* @param storageID
*/
@ -45,10 +44,6 @@ public class DatanodeStorage {
this(storageID, State.NORMAL, StorageType.DEFAULT);
}
public DatanodeStorage(String sid, State s) {
this(sid, s, StorageType.DEFAULT);
}
public DatanodeStorage(String sid, State s, StorageType sm) {
this.storageID = sid;
this.state = s;
@ -69,11 +64,11 @@ public class DatanodeStorage {
/**
* Generate new storage ID. The format of this string can be changed
* in the future without requiring that old SotrageIDs be updated.
* in the future without requiring that old storage IDs be updated.
*
* @return unique storage ID
*/
public static String newStorageID() {
public static String generateUuid() {
return "DS-" + UUID.randomUUID();
}
}

View File

@ -55,7 +55,7 @@ message DatanodeStorageProto {
required string storageUuid = 1;
optional StorageState state = 2 [default = NORMAL];
optional StorageTypeProto storageType = 3;
optional StorageTypeProto storageType = 3 [default = DISK];
}
/**

View File

@ -853,8 +853,8 @@ public class DFSTestUtil {
public static DatanodeStorageInfo createDatanodeStorageInfo(
String storageID, String ip, String rack) {
final DatanodeStorage storage = new DatanodeStorage(storageID);
return new DatanodeStorageInfo(
BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage), storage);
final DatanodeDescriptor dn = BlockManagerTestUtil.getDatanodeDescriptor(ip, rack, storage);
return BlockManagerTestUtil.newDatanodeStorageInfo(dn, storage);
}
public static DatanodeDescriptor[] toDatanodeDescriptor(
DatanodeStorageInfo[] storages) {

View File

@ -55,6 +55,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Random;
import org.apache.commons.logging.Log;
@ -1969,12 +1970,12 @@ public class MiniDFSCluster {
* @param dataNodeIndex - data node whose block report is desired - the index is same as for getDataNodes()
* @return the block report for the specified data node
*/
public Iterable<Block> getBlockReport(String bpid, int dataNodeIndex) {
public Map<String, BlockListAsLongs> getBlockReport(String bpid, int dataNodeIndex) {
if (dataNodeIndex < 0 || dataNodeIndex > dataNodes.size()) {
throw new IndexOutOfBoundsException();
}
final DataNode dn = dataNodes.get(dataNodeIndex).datanode;
return DataNodeTestUtils.getFSDataset(dn).getBlockReport(bpid);
return DataNodeTestUtils.getFSDataset(dn).getBlockReports(bpid);
}
@ -1983,11 +1984,12 @@ public class MiniDFSCluster {
* @return block reports from all data nodes
* BlockListAsLongs is indexed in the same order as the list of datanodes returned by getDataNodes()
*/
public Iterable<Block>[] getAllBlockReports(String bpid) {
public List<Map<String, BlockListAsLongs>> getAllBlockReports(String bpid) {
int numDataNodes = dataNodes.size();
Iterable<Block>[] result = new BlockListAsLongs[numDataNodes];
final List<Map<String, BlockListAsLongs>> result
= new ArrayList<Map<String, BlockListAsLongs>>(numDataNodes);
for (int i = 0; i < numDataNodes; ++i) {
result[i] = getBlockReport(bpid, i);
result.add(getBlockReport(bpid, i));
}
return result;
}

View File

@ -23,6 +23,7 @@ import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.atomic.AtomicInteger;
@ -35,6 +36,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.tools.DFSAdmin;
@ -1392,11 +1394,14 @@ public class TestDFSShell {
List<File> files = new ArrayList<File>();
List<DataNode> datanodes = cluster.getDataNodes();
String poolId = cluster.getNamesystem().getBlockPoolId();
Iterable<Block>[] blocks = cluster.getAllBlockReports(poolId);
for(int i = 0; i < blocks.length; i++) {
List<Map<String, BlockListAsLongs>> blocks = cluster.getAllBlockReports(poolId);
for(int i = 0; i < blocks.size(); i++) {
DataNode dn = datanodes.get(i);
for(Block b : blocks[i]) {
files.add(DataNodeTestUtils.getFile(dn, poolId, b.getBlockId()));
Map<String, BlockListAsLongs> map = blocks.get(i);
for(Map.Entry<String, BlockListAsLongs> e : map.entrySet()) {
for(Block b : e.getValue()) {
files.add(DataNodeTestUtils.getFile(dn, poolId, b.getBlockId()));
}
}
}
return files;

View File

@ -22,16 +22,16 @@ import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
@ -136,7 +136,7 @@ public class TestInjectionForSimulatedStorage {
DFSTestUtil.createFile(cluster.getFileSystem(), testPath, filesize,
filesize, blockSize, (short) numDataNodes, 0L);
waitForBlockReplication(testFile, dfsClient.getNamenode(), numDataNodes, 20);
Iterable<Block>[] blocksList = cluster.getAllBlockReports(bpid);
List<Map<String, BlockListAsLongs>> blocksList = cluster.getAllBlockReports(bpid);
cluster.shutdown();
cluster = null;
@ -157,9 +157,11 @@ public class TestInjectionForSimulatedStorage {
.build();
cluster.waitActive();
Set<Block> uniqueBlocks = new HashSet<Block>();
for (int i=0; i<blocksList.length; ++i) {
for (Block b : blocksList[i]) {
uniqueBlocks.add(new Block(b));
for(Map<String, BlockListAsLongs> map : blocksList) {
for(BlockListAsLongs blockList : map.values()) {
for(Block b : blockList) {
uniqueBlocks.add(new Block(b));
}
}
}
// Insert all the blocks in the first data node

View File

@ -432,17 +432,18 @@ public class TestPBHelper {
DFSTestUtil.getLocalDatanodeInfo("127.0.0.1", "h3",
AdminStates.NORMAL)
};
String[] storageIDs = {"s1", "s2", "s3"};
StorageType[] media = {
StorageType.DISK,
StorageType.SSD,
StorageType.DISK
};
LocatedBlock lb = new LocatedBlock(
new ExtendedBlock("bp12", 12345, 10, 53), dnInfos, 5, false);
new ExtendedBlock("bp12", 12345, 10, 53),
dnInfos, storageIDs, media, 5, false, new DatanodeInfo[]{});
lb.setBlockToken(new Token<BlockTokenIdentifier>(
"identifier".getBytes(), "password".getBytes(), new Text("kind"),
new Text("service")));
lb.setStorageTypes(media);
return lb;
}

View File

@ -23,7 +23,6 @@ import java.util.Collection;
import java.util.HashSet;
import java.util.Set;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.Block;
@ -219,11 +218,16 @@ public class BlockManagerTestUtil {
bm.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
}
public static DatanodeStorageInfo updateStorage(DatanodeDescriptor dn,
DatanodeStorage s) {
return dn.updateStorage(s);
}
public static DatanodeDescriptor getLocalDatanodeDescriptor(
boolean initializeStorage) {
DatanodeDescriptor dn = new DatanodeDescriptor(DFSTestUtil.getLocalDatanodeID());
if (initializeStorage) {
dn.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
dn.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
}
return dn;
}
@ -231,7 +235,7 @@ public class BlockManagerTestUtil {
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
String rackLocation, boolean initializeStorage) {
return getDatanodeDescriptor(ipAddr, rackLocation,
initializeStorage? new DatanodeStorage(DatanodeStorage.newStorageID()): null);
initializeStorage? new DatanodeStorage(DatanodeStorage.generateUuid()): null);
}
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
@ -244,6 +248,10 @@ public class BlockManagerTestUtil {
return dn;
}
public static DatanodeStorageInfo newDatanodeStorageInfo(
DatanodeDescriptor dn, DatanodeStorage s) {
return new DatanodeStorageInfo(dn, s);
}
public static StorageReport[] getStorageReportsForDatanode(
DatanodeDescriptor dnd) {

View File

@ -76,7 +76,7 @@ public class TestBlockInfo {
}
assertEquals("There should be MAX_BLOCK blockInfo's", MAX_BLOCKS, len);
headIndex = dd.getHead().findStorageInfo(dd);
headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd);
LOG.info("Moving each block to the head of the list...");
for (int i = 0; i < MAX_BLOCKS; i++) {
@ -84,23 +84,23 @@ public class TestBlockInfo {
headIndex = dd.moveBlockToHead(blockInfoList.get(i), curIndex, headIndex);
// the moved element must be at the head of the list
assertEquals("Block should be at the head of the list now.",
blockInfoList.get(i), dd.getHead());
blockInfoList.get(i), dd.getBlockListHeadForTesting());
}
// move head of the list to the head - this should not change the list
LOG.info("Moving head to the head...");
BlockInfo temp = dd.getHead();
BlockInfo temp = dd.getBlockListHeadForTesting();
curIndex = 0;
headIndex = 0;
dd.moveBlockToHead(temp, curIndex, headIndex);
assertEquals(
"Moving head to the head of the list shopuld not change the list",
temp, dd.getHead());
temp, dd.getBlockListHeadForTesting());
// check all elements of the list against the original blockInfoList
LOG.info("Checking elements of the list...");
temp = dd.getHead();
temp = dd.getBlockListHeadForTesting();
assertNotNull("Head should not be null", temp);
int c = MAX_BLOCKS - 1;
while (temp != null) {
@ -110,7 +110,7 @@ public class TestBlockInfo {
}
LOG.info("Moving random blocks to the head of the list...");
headIndex = dd.getHead().findStorageInfo(dd);
headIndex = dd.getBlockListHeadForTesting().findStorageInfo(dd);
Random rand = new Random();
for (int i = 0; i < MAX_BLOCKS; i++) {
int j = rand.nextInt(MAX_BLOCKS);
@ -118,7 +118,7 @@ public class TestBlockInfo {
headIndex = dd.moveBlockToHead(blockInfoList.get(j), curIndex, headIndex);
// the moved element must be at the head of the list
assertEquals("Block should be at the head of the list now.",
blockInfoList.get(j), dd.getHead());
blockInfoList.get(j), dd.getBlockListHeadForTesting());
}
}
}

View File

@ -103,7 +103,7 @@ public class TestBlockManager {
// construct network topology
for (DatanodeDescriptor dn : nodesToAdd) {
cluster.add(dn);
dn.getStorageInfos()[0].setUtilization(
dn.getStorageInfos()[0].setUtilizationForTesting(
2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L,
2 * HdfsConstants.MIN_BLOCKS_FOR_WRITE*BLOCK_SIZE, 0L);
dn.updateHeartbeat(
@ -295,7 +295,7 @@ public class TestBlockManager {
// the third off-rack replica.
DatanodeDescriptor rackCNode =
DFSTestUtil.getDatanodeDescriptor("7.7.7.7", "/rackC");
rackCNode.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
rackCNode.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
addNodes(ImmutableList.of(rackCNode));
try {
DatanodeStorageInfo[] pipeline2 = scheduleSingleReplication(blockInfo);
@ -531,7 +531,7 @@ public class TestBlockManager {
DatanodeStorageInfo ds = node.getStorageInfos()[0];
// TODO: Needs to be fixed. DatanodeUuid is not storageID.
node.setDatanodeUuid(ds.getStorageID());
node.setDatanodeUuidForTesting(ds.getStorageID());
node.isAlive = true;
@ -577,7 +577,7 @@ public class TestBlockManager {
DatanodeStorageInfo ds = node.getStorageInfos()[0];
// TODO: Needs to be fixed. DatanodeUuid is not storageID.
node.setDatanodeUuid(ds.getStorageID());
node.setDatanodeUuidForTesting(ds.getStorageID());
node.isAlive = true;

View File

@ -63,7 +63,7 @@ public class TestHeartbeatHandling {
final DatanodeRegistration nodeReg =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
final DatanodeDescriptor dd = NameNodeAdapter.getDatanode(namesystem, nodeReg);
final String storageID = DatanodeStorage.newStorageID();
final String storageID = DatanodeStorage.generateUuid();
dd.updateStorage(new DatanodeStorage(storageID));
final int REMAINING_BLOCKS = 1;
@ -146,15 +146,15 @@ public class TestHeartbeatHandling {
final DatanodeRegistration nodeReg1 =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(0), poolId);
final DatanodeDescriptor dd1 = NameNodeAdapter.getDatanode(namesystem, nodeReg1);
dd1.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
dd1.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
final DatanodeRegistration nodeReg2 =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(1), poolId);
final DatanodeDescriptor dd2 = NameNodeAdapter.getDatanode(namesystem, nodeReg2);
dd2.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
dd2.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
final DatanodeRegistration nodeReg3 =
DataNodeTestUtils.getDNRegistrationForBP(cluster.getDataNodes().get(2), poolId);
final DatanodeDescriptor dd3 = NameNodeAdapter.getDatanode(namesystem, nodeReg3);
dd3.updateStorage(new DatanodeStorage(DatanodeStorage.newStorageID()));
dd3.updateStorage(new DatanodeStorage(DatanodeStorage.generateUuid()));
try {
namesystem.writeLock();

View File

@ -103,7 +103,7 @@ public class TestOverReplicatedBlocks {
String corruptMachineName = corruptDataNode.getXferAddr();
for (DatanodeDescriptor datanode : hm.getDatanodes()) {
if (!corruptMachineName.equals(datanode.getXferAddr())) {
datanode.getStorageInfos()[0].setUtilization(100L, 100L, 0, 100L);
datanode.getStorageInfos()[0].setUtilizationForTesting(100L, 100L, 0, 100L);
datanode.updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(datanode),
0L, 0L, 0, 0);

View File

@ -93,7 +93,7 @@ public class TestReplicationPolicy {
private static void updateHeartbeatWithUsage(DatanodeDescriptor dn,
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
long dnCacheCapacity, long dnCacheUsed, int xceiverCount, int volFailures) {
dn.getStorageInfos()[0].setUtilization(
dn.getStorageInfos()[0].setUtilizationForTesting(
capacity, dfsUsed, remaining, blockPoolUsed);
dn.updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(dn),

View File

@ -150,7 +150,7 @@ public class TestReplicationPolicyWithNodeGroup {
long capacity, long dfsUsed, long remaining, long blockPoolUsed,
long dnCacheCapacity, long dnCacheUsed, int xceiverCount,
int volFailures) {
dn.getStorageInfos()[0].setUtilization(
dn.getStorageInfos()[0].setUtilizationForTesting(
capacity, dfsUsed, remaining, blockPoolUsed);
dn.updateHeartbeat(
BlockManagerTestUtil.getStorageReportsForDatanode(dn),

View File

@ -17,12 +17,33 @@
*/
package org.apache.hadoop.hdfs.server.common;
import com.google.common.base.Strings;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.io.StringReader;
import java.net.InetSocketAddress;
import java.text.MessageFormat;
import java.util.ArrayList;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.jsp.JspWriter;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.namenode.NameNodeHttpServer;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
@ -48,20 +69,7 @@ import org.mockito.stubbing.Answer;
import org.xml.sax.InputSource;
import org.xml.sax.SAXException;
import javax.servlet.ServletContext;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.jsp.JspWriter;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import java.io.IOException;
import java.io.StringReader;
import java.net.InetSocketAddress;
import java.text.MessageFormat;
import java.util.ArrayList;
import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import com.google.common.base.Strings;
public class TestJspHelper {
@ -459,8 +467,8 @@ public class TestJspHelper {
DatanodeDescriptor dnDesc2 = new DatanodeDescriptor(dnId2, "rack2");
// Update the DatanodeDescriptors with their attached storages.
dnDesc1.updateStorage(new DatanodeStorage("dnStorage1"));
dnDesc2.updateStorage(new DatanodeStorage("dnStorage2"));
BlockManagerTestUtil.updateStorage(dnDesc1, new DatanodeStorage("dnStorage1"));
BlockManagerTestUtil.updateStorage(dnDesc2, new DatanodeStorage("dnStorage2"));
StorageReport[] report1 = new StorageReport[] {
new StorageReport("dnStorage1", false, 1024, 100, 924, 100)

View File

@ -324,7 +324,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
private static class SimulatedStorage {
private Map<String, SimulatedBPStorage> map =
new HashMap<String, SimulatedBPStorage>();
private final String storageUuid = "SimulatedStroage-" + DatanodeStorage.newStorageID();
private final String storageUuid = "SimulatedStroage-" + DatanodeStorage.generateUuid();
private final long capacity; // in bytes
@ -470,8 +470,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
}
@Override
public synchronized BlockListAsLongs getBlockReport(String bpid) {
synchronized BlockListAsLongs getBlockReport(String bpid) {
final List<Block> blocks = new ArrayList<Block>();
final Map<Block, BInfo> map = blockMap.get(bpid);
if (map != null) {
@ -684,7 +683,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
}
@Override // FsDatasetSpi
public Replica recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
public String recoverClose(ExtendedBlock b, long newGS, long expectedBlockLen)
throws IOException {
final Map<Block, BInfo> map = getMap(b.getBlockPoolId());
BInfo binfo = map.get(b.getLocalBlock());
@ -698,7 +697,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
map.remove(b.getLocalBlock());
binfo.theBlock.setGenerationStamp(newGS);
map.put(binfo.theBlock, binfo);
return binfo;
return binfo.getStorageUuid();
}
@Override // FsDatasetSpi

View File

@ -35,7 +35,6 @@ import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.Collection;
@ -46,9 +45,18 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.hdfs.*;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
@ -73,7 +81,10 @@ import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.log4j.Level;
import org.junit.*;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
@ -125,7 +136,7 @@ public class TestBlockRecovery {
File dataDir = new File(DATA_DIR);
FileUtil.fullyDelete(dataDir);
dataDir.mkdirs();
StorageLocation location = new StorageLocation(new URI(dataDir.getPath()));
StorageLocation location = StorageLocation.parse(dataDir.getPath());
locations.add(location);
final DatanodeProtocolClientSideTranslatorPB namenode =
mock(DatanodeProtocolClientSideTranslatorPB.class);

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.hdfs.server.datanode;
import java.io.*;
import java.net.URI;
import java.util.*;
import org.apache.hadoop.conf.Configuration;
@ -40,7 +39,7 @@ public class TestDataDirs {
@Test (timeout = 30000)
public void testDataDirParsing() throws Throwable {
Configuration conf = new Configuration();
ArrayList<StorageLocation> locations;
List<StorageLocation> locations;
File dir0 = new File("/dir0");
File dir1 = new File("/dir1");
File dir2 = new File("/dir2");
@ -50,7 +49,7 @@ public class TestDataDirs {
// type is not case-sensitive
String locations1 = "[disk]/dir0,[DISK]/dir1,[sSd]/dir2,[disK]/dir3";
conf.set(DFS_DATANODE_DATA_DIR_KEY, locations1);
locations = new ArrayList<StorageLocation>(DataNode.getStorageLocations(conf));
locations = DataNode.getStorageLocations(conf);
assertThat(locations.size(), is(4));
assertThat(locations.get(0).getStorageType(), is(StorageType.DISK));
assertThat(locations.get(0).getUri(), is(dir0.toURI()));
@ -61,23 +60,21 @@ public class TestDataDirs {
assertThat(locations.get(3).getStorageType(), is(StorageType.DISK));
assertThat(locations.get(3).getUri(), is(dir3.toURI()));
// Verify that an unrecognized storage type is ignored.
// Verify that an unrecognized storage type result in an exception.
String locations2 = "[BadMediaType]/dir0,[ssd]/dir1,[disk]/dir2";
conf.set(DFS_DATANODE_DATA_DIR_KEY, locations2);
locations = new ArrayList<StorageLocation>(DataNode.getStorageLocations(conf));
assertThat(locations.size(), is(3));
assertThat(locations.get(0).getStorageType(), is(StorageType.DISK));
assertThat(locations.get(0).getUri(), is(dir0.toURI()));
assertThat(locations.get(1).getStorageType(), is(StorageType.SSD));
assertThat(locations.get(1).getUri(), is(dir1.toURI()));
assertThat(locations.get(2).getStorageType(), is(StorageType.DISK));
assertThat(locations.get(2).getUri(), is(dir2.toURI()));
try {
locations = DataNode.getStorageLocations(conf);
fail();
} catch(IllegalArgumentException iae) {
DataNode.LOG.info("The exception is expected.", iae);
}
// Assert that a string with no storage type specified is
// correctly parsed and the default storage type is picked up.
String locations3 = "/dir0,/dir1";
conf.set(DFS_DATANODE_DATA_DIR_KEY, locations3);
locations = new ArrayList<StorageLocation>(DataNode.getStorageLocations(conf));
locations = DataNode.getStorageLocations(conf);
assertThat(locations.size(), is(2));
assertThat(locations.get(0).getStorageType(), is(StorageType.DISK));
assertThat(locations.get(0).getUri(), is(dir0.toURI()));
@ -94,11 +91,11 @@ public class TestDataDirs {
LocalFileSystem fs = mock(LocalFileSystem.class);
AbstractList<StorageLocation> locations = new ArrayList<StorageLocation>();
locations.add(new StorageLocation(new URI("file:/p1/")));
locations.add(new StorageLocation(new URI("file:/p2/")));
locations.add(new StorageLocation(new URI("file:/p3/")));
locations.add(StorageLocation.parse("file:/p1/"));
locations.add(StorageLocation.parse("file:/p2/"));
locations.add(StorageLocation.parse("file:/p3/"));
ArrayList<StorageLocation> checkedLocations =
List<StorageLocation> checkedLocations =
DataNode.checkStorageLocations(locations, fs, diskChecker);
assertEquals("number of valid data dirs", 1, checkedLocations.size());
String validDir = checkedLocations.iterator().next().getFile().getPath();