HDFS-5480. Update Balancer for HDFS-2832. (Contributed by szetszwo)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-2832@1540547 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arpit Agarwal 2013-11-10 20:59:32 +00:00
parent fccbb5072c
commit 907fb15ee8
12 changed files with 155 additions and 85 deletions

View File

@ -91,3 +91,5 @@ IMPROVEMENTS:
HDFS-5481. Fix TestDataNodeVolumeFailure in branch HDFS-2832. (Contributed HDFS-5481. Fix TestDataNodeVolumeFailure in branch HDFS-2832. (Contributed
by Junping Du) by Junping Du)
HDFS-5480. Update Balancer for HDFS-2832. (Contributed by szetszwo)

View File

@ -42,7 +42,6 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlo
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReportBadBlocksResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageBlockReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReceivedDeletedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.StorageReportProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.DatanodeIDProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.VersionRequestProto;
@ -57,7 +56,6 @@ import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import com.google.common.primitives.Longs;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -103,14 +101,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements
HeartbeatRequestProto request) throws ServiceException { HeartbeatRequestProto request) throws ServiceException {
HeartbeatResponse response; HeartbeatResponse response;
try { try {
List<StorageReportProto> list = request.getReportsList(); final StorageReport[] report = PBHelper.convertStorageReports(
StorageReport[] report = new StorageReport[list.size()]; request.getReportsList());
int i = 0;
for (StorageReportProto p : list) {
report[i++] = new StorageReport(p.getStorageUuid(), p.getFailed(),
p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
p.getBlockPoolUsed());
}
response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()), response = impl.sendHeartbeat(PBHelper.convert(request.getRegistration()),
report, request.getDnCacheCapacity(), request.getDnCacheUsed(), report, request.getDnCacheCapacity(), request.getDnCacheUsed(),
request.getXmitsInProgress(), request.getXmitsInProgress(),

View File

@ -25,41 +25,40 @@ import java.util.Arrays;
import java.util.EnumSet; import java.util.EnumSet;
import java.util.List; import java.util.List;
import com.google.common.base.Preconditions;
import org.apache.hadoop.fs.ContentSummary; import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag; import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.proto.HAServiceProtocolProtos; import org.apache.hadoop.ha.proto.HAServiceProtocolProtos;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks; import org.apache.hadoop.hdfs.protocol.CorruptFileBlocks;
import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates; import org.apache.hadoop.hdfs.protocol.DatanodeInfo.AdminStates;
import org.apache.hadoop.hdfs.protocol.DirectoryListing; import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction; import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
import org.apache.hadoop.hdfs.protocol.PathBasedCacheDirective;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffReportEntry;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport.DiffType;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus; import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateFlagProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.DatanodeReportTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.GetFsStatsResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathBasedCacheDirectiveInfoProto; import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.PathBasedCacheDirectiveInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.SafeModeActionProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BalancerBandwidthCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockIdCommandProto;
@ -98,8 +97,6 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.HdfsFileStatusProto.File
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto.Builder; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlockProto.Builder;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlocksProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.LocatedBlocksProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.NamenodeRegistrationProto.NamenodeRoleProto;
@ -113,6 +110,8 @@ import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshotDiffReportProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryListingProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryListingProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryStatusProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.SnapshottableDirectoryStatusProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto; import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageTypeProto;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.StorageUuidsProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalInfoProto;
import org.apache.hadoop.hdfs.security.token.block.BlockKey; import org.apache.hadoop.hdfs.security.token.block.BlockKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
@ -288,12 +287,16 @@ public class PBHelper {
public static BlockWithLocationsProto convert(BlockWithLocations blk) { public static BlockWithLocationsProto convert(BlockWithLocations blk) {
return BlockWithLocationsProto.newBuilder() return BlockWithLocationsProto.newBuilder()
.setBlock(convert(blk.getBlock())) .setBlock(convert(blk.getBlock()))
.addAllDatanodeUuids(Arrays.asList(blk.getDatanodeUuids()))
.addAllStorageUuids(Arrays.asList(blk.getStorageIDs())).build(); .addAllStorageUuids(Arrays.asList(blk.getStorageIDs())).build();
} }
public static BlockWithLocations convert(BlockWithLocationsProto b) { public static BlockWithLocations convert(BlockWithLocationsProto b) {
return new BlockWithLocations(convert(b.getBlock()), b.getStorageUuidsList() final List<String> datanodeUuids = b.getDatanodeUuidsList();
.toArray(new String[0])); final List<String> storageUuids = b.getStorageUuidsList();
return new BlockWithLocations(convert(b.getBlock()),
datanodeUuids.toArray(new String[datanodeUuids.size()]),
storageUuids.toArray(new String[storageUuids.size()]));
} }
public static BlocksWithLocationsProto convert(BlocksWithLocations blks) { public static BlocksWithLocationsProto convert(BlocksWithLocations blks) {
@ -1535,6 +1538,21 @@ public class PBHelper {
return builder.build(); return builder.build();
} }
public static StorageReport convert(StorageReportProto p) {
return new StorageReport(p.getStorageUuid(), p.getFailed(),
p.getCapacity(), p.getDfsUsed(), p.getRemaining(),
p.getBlockPoolUsed());
}
public static StorageReport[] convertStorageReports(
List<StorageReportProto> list) {
final StorageReport[] report = new StorageReport[list.size()];
for (int i = 0; i < report.length; i++) {
report[i] = convert(list.get(i));
}
return report;
}
public static JournalInfo convert(JournalInfoProto info) { public static JournalInfo convert(JournalInfoProto info) {
int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0; int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0; int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.hdfs.server.balancer; package org.apache.hadoop.hdfs.server.balancer;
import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed; import static org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed;
import java.io.BufferedInputStream; import java.io.BufferedInputStream;
@ -221,9 +220,9 @@ public class Balancer {
private Map<Block, BalancerBlock> globalBlockList private Map<Block, BalancerBlock> globalBlockList
= new HashMap<Block, BalancerBlock>(); = new HashMap<Block, BalancerBlock>();
private MovedBlocks movedBlocks = new MovedBlocks(); private MovedBlocks movedBlocks = new MovedBlocks();
// Map storage IDs to BalancerDatanodes /** Map (datanodeUuid -> BalancerDatanodes) */
private Map<String, BalancerDatanode> datanodes private final Map<String, BalancerDatanode> datanodeMap
= new HashMap<String, BalancerDatanode>(); = new HashMap<String, BalancerDatanode>();
private NetworkTopology cluster; private NetworkTopology cluster;
@ -241,6 +240,14 @@ public class Balancer {
private PendingBlockMove() { private PendingBlockMove() {
} }
@Override
public String toString() {
final Block b = block.getBlock();
return b + " with size=" + b.getNumBytes() + " from "
+ source.getDisplayName() + " to " + target.getDisplayName()
+ " through " + proxySource.getDisplayName();
}
/* choose a block & a proxy source for this pendingMove /* choose a block & a proxy source for this pendingMove
* whose source & target have already been chosen. * whose source & target have already been chosen.
* *
@ -272,11 +279,7 @@ public class Balancer {
if ( chooseProxySource() ) { if ( chooseProxySource() ) {
movedBlocks.add(block); movedBlocks.add(block);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Decided to move block "+ block.getBlockId() LOG.debug("Decided to move " + this);
+" with a length of "+StringUtils.byteDesc(block.getNumBytes())
+ " bytes from " + source.getDisplayName()
+ " to " + target.getDisplayName()
+ " using proxy source " + proxySource.getDisplayName() );
} }
return true; return true;
} }
@ -352,17 +355,9 @@ public class Balancer {
sendRequest(out); sendRequest(out);
receiveResponse(in); receiveResponse(in);
bytesMoved.inc(block.getNumBytes()); bytesMoved.inc(block.getNumBytes());
LOG.info( "Moving block " + block.getBlock().getBlockId() + LOG.info("Successfully moved " + this);
" from "+ source.getDisplayName() + " to " +
target.getDisplayName() + " through " +
proxySource.getDisplayName() +
" is succeeded." );
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Error moving block "+block.getBlockId()+ LOG.warn("Failed to move " + this + ": " + e.getMessage());
" from " + source.getDisplayName() + " to " +
target.getDisplayName() + " through " +
proxySource.getDisplayName() +
": "+e.getMessage());
} finally { } finally {
IOUtils.closeStream(out); IOUtils.closeStream(out);
IOUtils.closeStream(in); IOUtils.closeStream(in);
@ -414,9 +409,7 @@ public class Balancer {
@Override @Override
public void run() { public void run() {
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Starting moving "+ block.getBlockId() + LOG.debug("Start moving " + PendingBlockMove.this);
" from " + proxySource.getDisplayName() + " to " +
target.getDisplayName());
} }
dispatch(); dispatch();
} }
@ -463,11 +456,6 @@ public class Balancer {
return block; return block;
} }
/* Return the block id */
private long getBlockId() {
return block.getBlockId();
}
/* Return the length of the block */ /* Return the length of the block */
private long getNumBytes() { private long getNumBytes() {
return block.getNumBytes(); return block.getNumBytes();
@ -674,10 +662,10 @@ public class Balancer {
synchronized (block) { synchronized (block) {
// update locations // update locations
for ( String storageID : blk.getStorageIDs() ) { for (String datanodeUuid : blk.getDatanodeUuids()) {
BalancerDatanode datanode = datanodes.get(storageID); final BalancerDatanode d = datanodeMap.get(datanodeUuid);
if (datanode != null) { // not an unknown datanode if (datanode != null) { // not an unknown datanode
block.addLocation(datanode); block.addLocation(d);
} }
} }
} }
@ -911,13 +899,13 @@ public class Balancer {
datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0); datanodeS.utilization)*datanodeS.datanode.getCapacity()/100.0);
} }
} }
this.datanodes.put(datanode.getDatanodeUuid(), datanodeS); datanodeMap.put(datanode.getDatanodeUuid(), datanodeS);
} }
//logging //logging
logNodes(); logNodes();
assert (this.datanodes.size() == assert (this.datanodeMap.size() ==
overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+ overUtilizedDatanodes.size()+underUtilizedDatanodes.size()+
aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size()) aboveAvgUtilizedDatanodes.size()+belowAvgUtilizedDatanodes.size())
: "Mismatched number of datanodes"; : "Mismatched number of datanodes";
@ -989,9 +977,9 @@ public class Balancer {
// At last, match all remaining nodes // At last, match all remaining nodes
chooseNodes(ANY_OTHER); chooseNodes(ANY_OTHER);
assert (datanodes.size() >= sources.size()+targets.size()) assert (datanodeMap.size() >= sources.size()+targets.size())
: "Mismatched number of datanodes (" + : "Mismatched number of datanodes (" +
datanodes.size() + " total, " + datanodeMap.size() + " total, " +
sources.size() + " sources, " + sources.size() + " sources, " +
targets.size() + " targets)"; targets.size() + " targets)";
@ -1292,7 +1280,7 @@ public class Balancer {
this.aboveAvgUtilizedDatanodes.clear(); this.aboveAvgUtilizedDatanodes.clear();
this.belowAvgUtilizedDatanodes.clear(); this.belowAvgUtilizedDatanodes.clear();
this.underUtilizedDatanodes.clear(); this.underUtilizedDatanodes.clear();
this.datanodes.clear(); this.datanodeMap.clear();
this.sources.clear(); this.sources.clear();
this.targets.clear(); this.targets.clear();
this.policy.reset(); this.policy.reset();

View File

@ -694,17 +694,17 @@ public class BlockManager {
/** /**
* Get all valid locations of the block * Get all valid locations of the block
*/ */
private List<String> getValidLocations(Block block) { private List<DatanodeStorageInfo> getValidLocations(Block block) {
ArrayList<String> machineSet = final List<DatanodeStorageInfo> locations
new ArrayList<String>(blocksMap.numNodes(block)); = new ArrayList<DatanodeStorageInfo>(blocksMap.numNodes(block));
for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) { for(DatanodeStorageInfo storage : blocksMap.getStorages(block)) {
final String storageID = storage.getStorageID(); final String storageID = storage.getStorageID();
// filter invalidate replicas // filter invalidate replicas
if(!invalidateBlocks.contains(storageID, block)) { if(!invalidateBlocks.contains(storageID, block)) {
machineSet.add(storageID); locations.add(storage);
} }
} }
return machineSet; return locations;
} }
private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks, private List<LocatedBlock> createLocatedBlockList(final BlockInfo[] blocks,
@ -2622,12 +2622,18 @@ assert storedBlock.findDatanode(dn) < 0 : "Block " + block
* return the length of the added block; 0 if the block is not added * return the length of the added block; 0 if the block is not added
*/ */
private long addBlock(Block block, List<BlockWithLocations> results) { private long addBlock(Block block, List<BlockWithLocations> results) {
final List<String> machineSet = getValidLocations(block); final List<DatanodeStorageInfo> locations = getValidLocations(block);
if(machineSet.size() == 0) { if(locations.size() == 0) {
return 0; return 0;
} else { } else {
results.add(new BlockWithLocations(block, final String[] datanodeUuids = new String[locations.size()];
machineSet.toArray(new String[machineSet.size()]))); final String[] storageIDs = new String[datanodeUuids.length];
for(int i = 0; i < locations.size(); i++) {
final DatanodeStorageInfo s = locations.get(i);
datanodeUuids[i] = s.getDatanodeDescriptor().getDatanodeUuid();
storageIDs[i] = s.getStorageID();
}
results.add(new BlockWithLocations(block, datanodeUuids, storageIDs));
return block.getNumBytes(); return block.getNumBytes();
} }
} }

View File

@ -866,7 +866,8 @@ class DataXceiver extends Receiver implements Runnable {
datanode.notifyNamenodeReceivedBlock( datanode.notifyNamenodeReceivedBlock(
block, delHint, blockReceiver.getReplicaInfo().getStorageUuid()); block, delHint, blockReceiver.getReplicaInfo().getStorageUuid());
LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()); LOG.info("Moved " + block + " from " + peer.getRemoteAddressString()
+ ", delHint=" + delHint);
} catch (IOException ioe) { } catch (IOException ioe) {
opStatus = ERROR; opStatus = ERROR;

View File

@ -17,6 +17,8 @@
*/ */
package org.apache.hadoop.hdfs.server.protocol; package org.apache.hadoop.hdfs.server.protocol;
import java.util.Arrays;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.Block;
@ -34,12 +36,14 @@ public class BlocksWithLocations {
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Evolving @InterfaceStability.Evolving
public static class BlockWithLocations { public static class BlockWithLocations {
Block block; final Block block;
String storageIDs[]; final String[] datanodeUuids;
final String[] storageIDs;
/** constructor */ /** constructor */
public BlockWithLocations(Block block, String[] storageIDs) { public BlockWithLocations(Block block, String[] datanodeUuids, String[] storageIDs) {
this.block = block; this.block = block;
this.datanodeUuids = datanodeUuids;
this.storageIDs = storageIDs; this.storageIDs = storageIDs;
} }
@ -48,10 +52,30 @@ public class BlocksWithLocations {
return block; return block;
} }
/** get the block's locations */ /** get the block's datanode locations */
public String[] getDatanodeUuids() {
return datanodeUuids;
}
/** get the block's storage locations */
public String[] getStorageIDs() { public String[] getStorageIDs() {
return storageIDs; return storageIDs;
} }
@Override
public String toString() {
final StringBuilder b = new StringBuilder();
b.append(block);
if (datanodeUuids.length == 0) {
return b.append("[]").toString();
}
b.append(storageIDs[0]).append('@').append(datanodeUuids[0]);
for(int i = 1; i < datanodeUuids.length; i++) {
b.append(", ").append(storageIDs[i]).append("@").append(datanodeUuids[i]);
}
return b.append("]").toString();
}
} }
private BlockWithLocations[] blocks; private BlockWithLocations[] blocks;

View File

@ -355,7 +355,8 @@ message BlockProto {
*/ */
message BlockWithLocationsProto { message BlockWithLocationsProto {
required BlockProto block = 1; // Block required BlockProto block = 1; // Block
repeated string storageUuids = 2; // Datanodes with replicas of the block repeated string datanodeUuids = 2; // Datanodes with replicas of the block
repeated string storageUuids = 3; // Storages with replicas of the block
} }
/** /**

View File

@ -328,9 +328,10 @@ public class MiniDFSCluster {
builder.nameNodePort, builder.nameNodeHttpPort); builder.nameNodePort, builder.nameNodeHttpPort);
} }
LOG.info("starting cluster with " + final int numNameNodes = builder.nnTopology.countNameNodes();
builder.nnTopology.countNameNodes() + " namenodes."); LOG.info("starting cluster: numNameNodes=" + numNameNodes
nameNodes = new NameNodeInfo[builder.nnTopology.countNameNodes()]; + ", numDataNodes=" + builder.numDataNodes);
nameNodes = new NameNodeInfo[numNameNodes];
initMiniDFSCluster(builder.conf, initMiniDFSCluster(builder.conf,
builder.numDataNodes, builder.numDataNodes,
@ -1920,12 +1921,14 @@ public class MiniDFSCluster {
// Wait for expected number of datanodes to start // Wait for expected number of datanodes to start
if (dnInfo.length != numDataNodes) { if (dnInfo.length != numDataNodes) {
LOG.info("dnInfo.length != numDataNodes");
return true; return true;
} }
// if one of the data nodes is not fully started, continue to wait // if one of the data nodes is not fully started, continue to wait
for (DataNodeProperties dn : dataNodes) { for (DataNodeProperties dn : dataNodes) {
if (!dn.datanode.isDatanodeFullyStarted()) { if (!dn.datanode.isDatanodeFullyStarted()) {
LOG.info("!dn.datanode.isDatanodeFullyStarted()");
return true; return true;
} }
} }
@ -1934,6 +1937,7 @@ public class MiniDFSCluster {
// using (capacity == 0) as proxy. // using (capacity == 0) as proxy.
for (DatanodeInfo dn : dnInfo) { for (DatanodeInfo dn : dnInfo) {
if (dn.getCapacity() == 0) { if (dn.getCapacity() == 0) {
LOG.info("dn.getCapacity() == 0");
return true; return true;
} }
} }
@ -1941,6 +1945,7 @@ public class MiniDFSCluster {
// If datanode dataset is not initialized then wait // If datanode dataset is not initialized then wait
for (DataNodeProperties dn : dataNodes) { for (DataNodeProperties dn : dataNodes) {
if (DataNodeTestUtils.getFSDataset(dn.datanode) == null) { if (DataNodeTestUtils.getFSDataset(dn.datanode) == null) {
LOG.info("DataNodeTestUtils.getFSDataset(dn.datanode) == null");
return true; return true;
} }
} }

View File

@ -166,8 +166,10 @@ public class TestPBHelper {
} }
private static BlockWithLocations getBlockWithLocations(int bid) { private static BlockWithLocations getBlockWithLocations(int bid) {
return new BlockWithLocations(new Block(bid, 0, 1), new String[] { "dn1", final String[] datanodeUuids = {"dn1", "dn2", "dn3"};
"dn2", "dn3" }); final String[] storageIDs = {"s1", "s2", "s3"};
return new BlockWithLocations(new Block(bid, 0, 1),
datanodeUuids, storageIDs);
} }
private void compare(BlockWithLocations locs1, BlockWithLocations locs2) { private void compare(BlockWithLocations locs1, BlockWithLocations locs2) {

View File

@ -31,6 +31,7 @@ import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
@ -50,6 +51,7 @@ import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.log4j.Level;
import org.junit.Test; import org.junit.Test;
/** /**
@ -58,6 +60,9 @@ import org.junit.Test;
public class TestBalancer { public class TestBalancer {
private static final Log LOG = LogFactory.getLog( private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.hdfs.TestBalancer"); "org.apache.hadoop.hdfs.TestBalancer");
static {
((Log4JLogger)Balancer.LOG).getLogger().setLevel(Level.ALL);
}
final static long CAPACITY = 500L; final static long CAPACITY = 500L;
final static String RACK0 = "/rack0"; final static String RACK0 = "/rack0";
@ -292,6 +297,16 @@ public class TestBalancer {
} while (!balanced); } while (!balanced);
} }
String long2String(long[] array) {
if (array.length == 0) {
return "<empty>";
}
StringBuilder b = new StringBuilder("[").append(array[0]);
for(int i = 1; i < array.length; i++) {
b.append(", ").append(array[i]);
}
return b.append("]").toString();
}
/** This test start a cluster with specified number of nodes, /** This test start a cluster with specified number of nodes,
* and fills it to be 30% full (with a single file replicated identically * and fills it to be 30% full (with a single file replicated identically
* to all datanodes); * to all datanodes);
@ -308,6 +323,11 @@ public class TestBalancer {
*/ */
private void doTest(Configuration conf, long[] capacities, String[] racks, private void doTest(Configuration conf, long[] capacities, String[] racks,
long newCapacity, String newRack, boolean useTool) throws Exception { long newCapacity, String newRack, boolean useTool) throws Exception {
LOG.info("capacities = " + long2String(capacities));
LOG.info("racks = " + Arrays.asList(racks));
LOG.info("newCapacity= " + newCapacity);
LOG.info("newRack = " + newRack);
LOG.info("useTool = " + useTool);
assertEquals(capacities.length, racks.length); assertEquals(capacities.length, racks.length);
int numOfDatanodes = capacities.length; int numOfDatanodes = capacities.length;
cluster = new MiniDFSCluster.Builder(conf) cluster = new MiniDFSCluster.Builder(conf)

View File

@ -21,7 +21,11 @@ import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.io.InputStream; import java.io.InputStream;
import java.io.OutputStream; import java.io.OutputStream;
import java.util.*; import java.util.ArrayList;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import javax.management.NotCompliantMBeanException; import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName; import javax.management.ObjectName;
@ -43,6 +47,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs; import org.apache.hadoop.hdfs.server.datanode.fsdataset.RollingLogs;
import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean; import org.apache.hadoop.hdfs.server.datanode.metrics.FSDatasetMBean;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
@ -319,15 +324,15 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
private static class SimulatedStorage { private static class SimulatedStorage {
private Map<String, SimulatedBPStorage> map = private Map<String, SimulatedBPStorage> map =
new HashMap<String, SimulatedBPStorage>(); new HashMap<String, SimulatedBPStorage>();
private final String storageUuid = "SimulatedStorage-UUID"; private final String storageUuid = "SimulatedStroage-" + DatanodeStorage.newStorageID();
private long capacity; // in bytes private final long capacity; // in bytes
synchronized long getFree() { synchronized long getFree() {
return capacity - getUsed(); return capacity - getUsed();
} }
synchronized long getCapacity() { long getCapacity() {
return capacity; return capacity;
} }
@ -383,9 +388,14 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
return bpStorage; return bpStorage;
} }
public String getStorageUuid() { String getStorageUuid() {
return storageUuid; return storageUuid;
} }
synchronized StorageReport getStorageReport(String bpid) {
return new StorageReport(getStorageUuid(), false, getCapacity(),
getUsed(), getFree(), map.get(bpid).getUsed());
}
} }
private final Map<String, Map<Block, BInfo>> blockMap private final Map<String, Map<Block, BInfo>> blockMap
@ -400,8 +410,9 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
} }
this.datanodeUuid = storage.getDatanodeUuid(); this.datanodeUuid = storage.getDatanodeUuid();
} else { } else {
this.datanodeUuid = "unknownStorageId-" + UUID.randomUUID(); this.datanodeUuid = "SimulatedDatanode-" + DataNode.generateUuid();
} }
registerMBean(datanodeUuid); registerMBean(datanodeUuid);
this.storage = new SimulatedStorage( this.storage = new SimulatedStorage(
conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY)); conf.getLong(CONFIG_PROPERTY_CAPACITY, DEFAULT_CAPACITY));
@ -478,7 +489,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
String bpid) { String bpid) {
Map<String, BlockListAsLongs> reports = Map<String, BlockListAsLongs> reports =
new HashMap<String, BlockListAsLongs>(); new HashMap<String, BlockListAsLongs>();
reports.put("", getBlockReport(bpid)); reports.put(storage.storageUuid, getBlockReport(bpid));
return reports; return reports;
} }
@ -1029,7 +1040,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override @Override
public StorageReport[] getStorageReports(String bpid) { public StorageReport[] getStorageReports(String bpid) {
return new StorageReport[0]; return new StorageReport[] {storage.getStorageReport(bpid)};
} }
@Override @Override