HDFS-6911. Archival Storage: check if a block is already scheduled in Mover. Contributed by Tsz Wo Nicholas Sze.
This commit is contained in:
parent
555900a9dc
commit
8ea20b53a8
|
@ -175,11 +175,7 @@ public class Dispatcher {
|
||||||
private DDatanode proxySource;
|
private DDatanode proxySource;
|
||||||
private StorageGroup target;
|
private StorageGroup target;
|
||||||
|
|
||||||
private PendingMove() {
|
private PendingMove(Source source, StorageGroup target) {
|
||||||
}
|
|
||||||
|
|
||||||
public PendingMove(DBlock block, Source source, StorageGroup target) {
|
|
||||||
this.block = block;
|
|
||||||
this.source = source;
|
this.source = source;
|
||||||
this.target = target;
|
this.target = target;
|
||||||
}
|
}
|
||||||
|
@ -199,9 +195,11 @@ public class Dispatcher {
|
||||||
* @return true if a block and its proxy are chosen; false otherwise
|
* @return true if a block and its proxy are chosen; false otherwise
|
||||||
*/
|
*/
|
||||||
private boolean chooseBlockAndProxy() {
|
private boolean chooseBlockAndProxy() {
|
||||||
|
// source and target must have the same storage type
|
||||||
|
final StorageType t = source.getStorageType();
|
||||||
// iterate all source's blocks until find a good one
|
// iterate all source's blocks until find a good one
|
||||||
for (Iterator<DBlock> i = source.getBlockIterator(); i.hasNext();) {
|
for (Iterator<DBlock> i = source.getBlockIterator(); i.hasNext();) {
|
||||||
if (markMovedIfGoodBlock(i.next())) {
|
if (markMovedIfGoodBlock(i.next(), t)) {
|
||||||
i.remove();
|
i.remove();
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -212,10 +210,10 @@ public class Dispatcher {
|
||||||
/**
|
/**
|
||||||
* @return true if the given block is good for the tentative move.
|
* @return true if the given block is good for the tentative move.
|
||||||
*/
|
*/
|
||||||
private boolean markMovedIfGoodBlock(DBlock block) {
|
private boolean markMovedIfGoodBlock(DBlock block, StorageType targetStorageType) {
|
||||||
synchronized (block) {
|
synchronized (block) {
|
||||||
synchronized (movedBlocks) {
|
synchronized (movedBlocks) {
|
||||||
if (isGoodBlockCandidate(source, target, block)) {
|
if (isGoodBlockCandidate(source, target, targetStorageType, block)) {
|
||||||
this.block = block;
|
this.block = block;
|
||||||
if (chooseProxySource()) {
|
if (chooseProxySource()) {
|
||||||
movedBlocks.put(block);
|
movedBlocks.put(block);
|
||||||
|
@ -235,7 +233,7 @@ public class Dispatcher {
|
||||||
*
|
*
|
||||||
* @return true if a proxy is found; otherwise false
|
* @return true if a proxy is found; otherwise false
|
||||||
*/
|
*/
|
||||||
public boolean chooseProxySource() {
|
private boolean chooseProxySource() {
|
||||||
final DatanodeInfo targetDN = target.getDatanodeInfo();
|
final DatanodeInfo targetDN = target.getDatanodeInfo();
|
||||||
// if node group is supported, first try add nodes in the same node group
|
// if node group is supported, first try add nodes in the same node group
|
||||||
if (cluster.isNodeGroupAware()) {
|
if (cluster.isNodeGroupAware()) {
|
||||||
|
@ -440,6 +438,18 @@ public class Dispatcher {
|
||||||
scheduledSize = 0L;
|
scheduledSize = 0L;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private PendingMove addPendingMove(DBlock block, final PendingMove pm) {
|
||||||
|
if (getDDatanode().addPendingBlock(pm)) {
|
||||||
|
if (pm.markMovedIfGoodBlock(block, getStorageType())) {
|
||||||
|
incScheduledSize(pm.block.getNumBytes());
|
||||||
|
return pm;
|
||||||
|
} else {
|
||||||
|
getDDatanode().removePendingBlock(pm);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
/** @return the name for display */
|
/** @return the name for display */
|
||||||
String getDisplayName() {
|
String getDisplayName() {
|
||||||
return datanode + ":" + storageType;
|
return datanode + ":" + storageType;
|
||||||
|
@ -599,8 +609,11 @@ public class Dispatcher {
|
||||||
|
|
||||||
/** Decide if the given block is a good candidate to move or not */
|
/** Decide if the given block is a good candidate to move or not */
|
||||||
private boolean isGoodBlockCandidate(DBlock block) {
|
private boolean isGoodBlockCandidate(DBlock block) {
|
||||||
|
// source and target must have the same storage type
|
||||||
|
final StorageType sourceStorageType = getStorageType();
|
||||||
for (Task t : tasks) {
|
for (Task t : tasks) {
|
||||||
if (Dispatcher.this.isGoodBlockCandidate(this, t.target, block)) {
|
if (Dispatcher.this.isGoodBlockCandidate(this, t.target,
|
||||||
|
sourceStorageType, block)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -620,11 +633,9 @@ public class Dispatcher {
|
||||||
for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
|
for (Iterator<Task> i = tasks.iterator(); i.hasNext();) {
|
||||||
final Task task = i.next();
|
final Task task = i.next();
|
||||||
final DDatanode target = task.target.getDDatanode();
|
final DDatanode target = task.target.getDDatanode();
|
||||||
PendingMove pendingBlock = new PendingMove();
|
final PendingMove pendingBlock = new PendingMove(this, task.target);
|
||||||
if (target.addPendingBlock(pendingBlock)) {
|
if (target.addPendingBlock(pendingBlock)) {
|
||||||
// target is not busy, so do a tentative block allocation
|
// target is not busy, so do a tentative block allocation
|
||||||
pendingBlock.source = this;
|
|
||||||
pendingBlock.target = task.target;
|
|
||||||
if (pendingBlock.chooseBlockAndProxy()) {
|
if (pendingBlock.chooseBlockAndProxy()) {
|
||||||
long blockSize = pendingBlock.block.getNumBytes();
|
long blockSize = pendingBlock.block.getNumBytes();
|
||||||
incScheduledSize(-blockSize);
|
incScheduledSize(-blockSize);
|
||||||
|
@ -641,6 +652,11 @@ public class Dispatcher {
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Add a pending move */
|
||||||
|
public PendingMove addPendingMove(DBlock block, StorageGroup target) {
|
||||||
|
return target.addPendingMove(block, new PendingMove(this, target));
|
||||||
|
}
|
||||||
|
|
||||||
/** Iterate all source's blocks to remove moved ones */
|
/** Iterate all source's blocks to remove moved ones */
|
||||||
private void removeMovedBlocks() {
|
private void removeMovedBlocks() {
|
||||||
|
@ -901,12 +917,6 @@ public class Dispatcher {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
|
|
||||||
DBlock block) {
|
|
||||||
// match source and target storage type
|
|
||||||
return isGoodBlockCandidate(source, target, source.getStorageType(), block);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Decide if the block is a good candidate to be moved from source to target.
|
* Decide if the block is a good candidate to be moved from source to target.
|
||||||
* A block is a good candidate if
|
* A block is a good candidate if
|
||||||
|
@ -914,7 +924,7 @@ public class Dispatcher {
|
||||||
* 2. the block does not have a replica on the target;
|
* 2. the block does not have a replica on the target;
|
||||||
* 3. doing the move does not reduce the number of racks that the block has
|
* 3. doing the move does not reduce the number of racks that the block has
|
||||||
*/
|
*/
|
||||||
public boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
|
private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target,
|
||||||
StorageType targetStorageType, DBlock block) {
|
StorageType targetStorageType, DBlock block) {
|
||||||
if (target.storageType != targetStorageType) {
|
if (target.storageType != targetStorageType) {
|
||||||
return false;
|
return false;
|
||||||
|
|
|
@ -31,6 +31,11 @@ public interface Matcher {
|
||||||
public boolean match(NetworkTopology cluster, Node left, Node right) {
|
public boolean match(NetworkTopology cluster, Node left, Node right) {
|
||||||
return cluster.isOnSameNodeGroup(left, right);
|
return cluster.isOnSameNodeGroup(left, right);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "SAME_NODE_GROUP";
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Match datanodes in the same rack. */
|
/** Match datanodes in the same rack. */
|
||||||
|
@ -39,6 +44,11 @@ public interface Matcher {
|
||||||
public boolean match(NetworkTopology cluster, Node left, Node right) {
|
public boolean match(NetworkTopology cluster, Node left, Node right) {
|
||||||
return cluster.isOnSameRack(left, right);
|
return cluster.isOnSameRack(left, right);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "SAME_RACK";
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
/** Match any datanode with any other datanode. */
|
/** Match any datanode with any other datanode. */
|
||||||
|
@ -47,5 +57,10 @@ public interface Matcher {
|
||||||
public boolean match(NetworkTopology cluster, Node left, Node right) {
|
public boolean match(NetworkTopology cluster, Node left, Node right) {
|
||||||
return left != right;
|
return left != right;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "ANY_OTHER";
|
||||||
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
|
@ -50,7 +50,7 @@ import java.util.*;
|
||||||
public class Mover {
|
public class Mover {
|
||||||
static final Log LOG = LogFactory.getLog(Mover.class);
|
static final Log LOG = LogFactory.getLog(Mover.class);
|
||||||
|
|
||||||
private static final Path MOVER_ID_PATH = new Path("/system/mover.id");
|
static final Path MOVER_ID_PATH = new Path("/system/mover.id");
|
||||||
|
|
||||||
private static class StorageMap {
|
private static class StorageMap {
|
||||||
private final StorageGroupMap<Source> sources
|
private final StorageGroupMap<Source> sources
|
||||||
|
@ -111,22 +111,25 @@ public class Mover {
|
||||||
this.storages = new StorageMap();
|
this.storages = new StorageMap();
|
||||||
this.blockStoragePolicies = BlockStoragePolicy.readBlockStorageSuite(conf);
|
this.blockStoragePolicies = BlockStoragePolicy.readBlockStorageSuite(conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
private ExitStatus run() {
|
void init() throws IOException {
|
||||||
try {
|
final List<DatanodeStorageReport> reports = dispatcher.init();
|
||||||
final List<DatanodeStorageReport> reports = dispatcher.init();
|
for(DatanodeStorageReport r : reports) {
|
||||||
for(DatanodeStorageReport r : reports) {
|
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
|
||||||
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
|
for(StorageType t : StorageType.asList()) {
|
||||||
for(StorageType t : StorageType.asList()) {
|
final long maxRemaining = getMaxRemaining(r, t);
|
||||||
final long maxRemaining = getMaxRemaining(r, t);
|
if (maxRemaining > 0L) {
|
||||||
if (maxRemaining > 0L) {
|
final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher);
|
||||||
final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher);
|
final StorageGroup target = dn.addTarget(t, maxRemaining);
|
||||||
final StorageGroup target = dn.addTarget(t, maxRemaining);
|
storages.add(source, target);
|
||||||
storages.add(source, target);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private ExitStatus run() {
|
||||||
|
try {
|
||||||
|
init();
|
||||||
new Processor().processNamespace();
|
new Processor().processNamespace();
|
||||||
|
|
||||||
return ExitStatus.IN_PROGRESS;
|
return ExitStatus.IN_PROGRESS;
|
||||||
|
@ -141,6 +144,14 @@ public class Mover {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DBlock newDBlock(Block block, List<MLocation> locations) {
|
||||||
|
final DBlock db = new DBlock(block);
|
||||||
|
for(MLocation ml : locations) {
|
||||||
|
db.addLocation(storages.getTarget(ml));
|
||||||
|
}
|
||||||
|
return db;
|
||||||
|
}
|
||||||
|
|
||||||
private static long getMaxRemaining(DatanodeStorageReport report, StorageType t) {
|
private static long getMaxRemaining(DatanodeStorageReport report, StorageType t) {
|
||||||
long max = 0L;
|
long max = 0L;
|
||||||
for(StorageReport r : report.getStorageReports()) {
|
for(StorageReport r : report.getStorageReports()) {
|
||||||
|
@ -169,11 +180,11 @@ public class Mover {
|
||||||
return sb.toString();
|
return sb.toString();
|
||||||
}
|
}
|
||||||
|
|
||||||
private class Processor {
|
class Processor {
|
||||||
private final DFSClient dfs;
|
private final DFSClient dfs;
|
||||||
private final List<String> snapshottableDirs = new ArrayList<String>();
|
private final List<String> snapshottableDirs = new ArrayList<String>();
|
||||||
|
|
||||||
private Processor() {
|
Processor() {
|
||||||
dfs = dispatcher.getDistributedFileSystem().getClient();
|
dfs = dispatcher.getDistributedFileSystem().getClient();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -290,15 +301,11 @@ public class Mover {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
|
void scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
|
||||||
final List<MLocation> locations = MLocation.toLocations(lb);
|
final List<MLocation> locations = MLocation.toLocations(lb);
|
||||||
Collections.shuffle(locations);
|
Collections.shuffle(locations);
|
||||||
|
final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations);
|
||||||
final DBlock db = new DBlock(lb.getBlock().getLocalBlock());
|
|
||||||
for(MLocation ml : locations) {
|
|
||||||
db.addLocation(storages.getTarget(ml));
|
|
||||||
}
|
|
||||||
|
|
||||||
for(final Iterator<StorageType> i = diff.existing.iterator(); i.hasNext(); ) {
|
for(final Iterator<StorageType> i = diff.existing.iterator(); i.hasNext(); ) {
|
||||||
final StorageType t = i.next();
|
final StorageType t = i.next();
|
||||||
|
@ -310,12 +317,18 @@ public class Mover {
|
||||||
if (scheduleMoveReplica(db, ml, source, diff.expected)) {
|
if (scheduleMoveReplica(db, ml, source, diff.expected)) {
|
||||||
i.remove();
|
i.remove();
|
||||||
j.remove();
|
j.remove();
|
||||||
|
return;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
boolean scheduleMoveReplica(DBlock db, MLocation ml,
|
||||||
|
List<StorageType> targetTypes) {
|
||||||
|
return scheduleMoveReplica(db, ml, storages.getSource(ml), targetTypes);
|
||||||
|
}
|
||||||
|
|
||||||
boolean scheduleMoveReplica(DBlock db, MLocation ml, Source source,
|
boolean scheduleMoveReplica(DBlock db, MLocation ml, Source source,
|
||||||
List<StorageType> targetTypes) {
|
List<StorageType> targetTypes) {
|
||||||
if (dispatcher.getCluster().isNodeGroupAware()) {
|
if (dispatcher.getCluster().isNodeGroupAware()) {
|
||||||
|
@ -341,12 +354,10 @@ public class Mover {
|
||||||
for(final Iterator<StorageType> i = targetTypes.iterator(); i.hasNext(); ) {
|
for(final Iterator<StorageType> i = targetTypes.iterator(); i.hasNext(); ) {
|
||||||
final StorageType t = i.next();
|
final StorageType t = i.next();
|
||||||
for(StorageGroup target : storages.getTargetStorages(t)) {
|
for(StorageGroup target : storages.getTargetStorages(t)) {
|
||||||
if (matcher.match(cluster, ml.datanode, target.getDatanodeInfo())
|
if (matcher.match(cluster, ml.datanode, target.getDatanodeInfo())) {
|
||||||
&& dispatcher.isGoodBlockCandidate(source, target, t, db)) {
|
final PendingMove pm = source.addPendingMove(db, target);
|
||||||
final PendingMove pm = dispatcher.new PendingMove(db, source, target);
|
if (pm != null) {
|
||||||
if (pm.chooseProxySource()) {
|
|
||||||
i.remove();
|
i.remove();
|
||||||
target.incScheduledSize(ml.size);
|
|
||||||
dispatcher.executePendingMove(pm);
|
dispatcher.executePendingMove(pm);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs;
|
||||||
import static org.apache.hadoop.hdfs.BlockStoragePolicy.ID_UNSPECIFIED;
|
import static org.apache.hadoop.hdfs.BlockStoragePolicy.ID_UNSPECIFIED;
|
||||||
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.EnumSet;
|
import java.util.EnumSet;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
@ -62,20 +61,6 @@ public class TestBlockStoragePolicy {
|
||||||
static final byte WARM = (byte) 8;
|
static final byte WARM = (byte) 8;
|
||||||
static final byte HOT = (byte) 12;
|
static final byte HOT = (byte) 12;
|
||||||
|
|
||||||
static final List<List<StorageType>> chosens = new ArrayList<List<StorageType>>();
|
|
||||||
static {
|
|
||||||
chosens.add(Arrays.<StorageType>asList());
|
|
||||||
chosens.add(Arrays.asList(StorageType.DISK));
|
|
||||||
chosens.add(Arrays.asList(StorageType.ARCHIVE));
|
|
||||||
chosens.add(Arrays.asList(StorageType.DISK, StorageType.DISK));
|
|
||||||
chosens.add(Arrays.asList(StorageType.DISK, StorageType.ARCHIVE));
|
|
||||||
chosens.add(Arrays.asList(StorageType.ARCHIVE, StorageType.ARCHIVE));
|
|
||||||
chosens.add(Arrays.asList(StorageType.DISK, StorageType.DISK, StorageType.DISK));
|
|
||||||
chosens.add(Arrays.asList(StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE));
|
|
||||||
chosens.add(Arrays.asList(StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE));
|
|
||||||
chosens.add(Arrays.asList(StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE));
|
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDefaultPolicies() {
|
public void testDefaultPolicies() {
|
||||||
final Map<Byte, String> expectedPolicyStrings = new HashMap<Byte, String>();
|
final Map<Byte, String> expectedPolicyStrings = new HashMap<Byte, String>();
|
||||||
|
@ -126,6 +111,17 @@ public class TestBlockStoragePolicy {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static StorageType[] newStorageTypes(int nDisk, int nArchive) {
|
||||||
|
final StorageType[] t = new StorageType[nDisk + nArchive];
|
||||||
|
Arrays.fill(t, 0, nDisk, StorageType.DISK);
|
||||||
|
Arrays.fill(t, nDisk, t.length, StorageType.ARCHIVE);
|
||||||
|
return t;
|
||||||
|
}
|
||||||
|
|
||||||
|
static List<StorageType> asList(int nDisk, int nArchive) {
|
||||||
|
return Arrays.asList(newStorageTypes(nDisk, nArchive));
|
||||||
|
}
|
||||||
|
|
||||||
static void assertStorageType(List<StorageType> computed, short replication,
|
static void assertStorageType(List<StorageType> computed, short replication,
|
||||||
StorageType... answers) {
|
StorageType... answers) {
|
||||||
Assert.assertEquals(replication, computed.size());
|
Assert.assertEquals(replication, computed.size());
|
||||||
|
@ -369,10 +365,14 @@ public class TestBlockStoragePolicy {
|
||||||
final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);
|
final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);
|
||||||
|
|
||||||
final short replication = 3;
|
final short replication = 3;
|
||||||
for(List<StorageType> c : chosens) {
|
for(int n = 0; n <= 3; n++) {
|
||||||
method.checkChooseStorageTypes(hot, replication, c);
|
for(int d = 0; d <= n; d++) {
|
||||||
method.checkChooseStorageTypes(warm, replication, c);
|
final int a = n - d;
|
||||||
method.checkChooseStorageTypes(cold, replication, c);
|
final List<StorageType> chosen = asList(d, a);
|
||||||
|
method.checkChooseStorageTypes(hot, replication, chosen);
|
||||||
|
method.checkChooseStorageTypes(warm, replication, chosen);
|
||||||
|
method.checkChooseStorageTypes(cold, replication, chosen);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -714,6 +714,47 @@ public class TestBlockStoragePolicy {
|
||||||
Assert.assertArrayEquals(expected, computed);
|
Assert.assertArrayEquals(expected, computed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testChooseExcess() {
|
||||||
|
final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT);
|
||||||
|
final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM);
|
||||||
|
final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD);
|
||||||
|
|
||||||
|
final short replication = 3;
|
||||||
|
for(int n = 0; n <= 6; n++) {
|
||||||
|
for(int d = 0; d <= n; d++) {
|
||||||
|
final int a = n - d;
|
||||||
|
final List<StorageType> chosen = asList(d, a);
|
||||||
|
{
|
||||||
|
final int nDisk = Math.max(0, d - replication);
|
||||||
|
final int nArchive = a;
|
||||||
|
final StorageType[] expected = newStorageTypes(nDisk, nArchive);
|
||||||
|
checkChooseExcess(hot, replication, chosen, expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
final int nDisk = Math.max(0, d - 1);
|
||||||
|
final int nArchive = Math.max(0, a - replication + 1);
|
||||||
|
final StorageType[] expected = newStorageTypes(nDisk, nArchive);
|
||||||
|
checkChooseExcess(warm, replication, chosen, expected);
|
||||||
|
}
|
||||||
|
|
||||||
|
{
|
||||||
|
final int nDisk = d;
|
||||||
|
final int nArchive = Math.max(0, a - replication );
|
||||||
|
final StorageType[] expected = newStorageTypes(nDisk, nArchive);
|
||||||
|
checkChooseExcess(cold, replication, chosen, expected);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static void checkChooseExcess(BlockStoragePolicy p, short replication,
|
||||||
|
List<StorageType> chosen, StorageType... expected) {
|
||||||
|
final List<StorageType> types = p.chooseExcess(replication, chosen);
|
||||||
|
assertStorageTypes(types, expected);
|
||||||
|
}
|
||||||
|
|
||||||
private void checkDirectoryListing(HdfsFileStatus[] stats, byte... policies) {
|
private void checkDirectoryListing(HdfsFileStatus[] stats, byte... policies) {
|
||||||
Assert.assertEquals(stats.length, policies.length);
|
Assert.assertEquals(stats.length, policies.length);
|
||||||
for (int i = 0; i < stats.length; i++) {
|
for (int i = 0; i < stats.length; i++) {
|
||||||
|
|
|
@ -0,0 +1,85 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.mover;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.URI;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collection;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.StorageType;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
|
||||||
|
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
|
||||||
|
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class TestMover {
|
||||||
|
static Mover newMover(Configuration conf) throws IOException {
|
||||||
|
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
||||||
|
Assert.assertEquals(1, namenodes.size());
|
||||||
|
|
||||||
|
final List<NameNodeConnector> nncs = NameNodeConnector.newNameNodeConnectors(
|
||||||
|
namenodes, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf);
|
||||||
|
return new Mover(nncs.get(0), conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testScheduleSameBlock() throws IOException {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(4).build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
final String file = "/testScheduleSameBlock/file";
|
||||||
|
|
||||||
|
{
|
||||||
|
final FSDataOutputStream out = dfs.create(new Path(file));
|
||||||
|
out.writeChars("testScheduleSameBlock");
|
||||||
|
out.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
final Mover mover = newMover(conf);
|
||||||
|
mover.init();
|
||||||
|
final Mover.Processor processor = mover.new Processor();
|
||||||
|
|
||||||
|
final LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
||||||
|
final List<MLocation> locations = MLocation.toLocations(lb);
|
||||||
|
final MLocation ml = locations.get(0);
|
||||||
|
final DBlock db = mover.newDBlock(lb.getBlock().getLocalBlock(), locations);
|
||||||
|
|
||||||
|
final List<StorageType> storageTypes = new ArrayList<StorageType>(
|
||||||
|
Arrays.asList(StorageType.DEFAULT, StorageType.DEFAULT));
|
||||||
|
Assert.assertTrue(processor.scheduleMoveReplica(db, ml, storageTypes));
|
||||||
|
Assert.assertFalse(processor.scheduleMoveReplica(db, ml, storageTypes));
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue