diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index bb498cca837..41ea1f38661 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -175,11 +175,7 @@ public class Dispatcher { private DDatanode proxySource; private StorageGroup target; - private PendingMove() { - } - - public PendingMove(DBlock block, Source source, StorageGroup target) { - this.block = block; + private PendingMove(Source source, StorageGroup target) { this.source = source; this.target = target; } @@ -199,9 +195,11 @@ public class Dispatcher { * @return true if a block and its proxy are chosen; false otherwise */ private boolean chooseBlockAndProxy() { + // source and target must have the same storage type + final StorageType t = source.getStorageType(); // iterate all source's blocks until find a good one for (Iterator i = source.getBlockIterator(); i.hasNext();) { - if (markMovedIfGoodBlock(i.next())) { + if (markMovedIfGoodBlock(i.next(), t)) { i.remove(); return true; } @@ -212,10 +210,10 @@ public class Dispatcher { /** * @return true if the given block is good for the tentative move. */ - private boolean markMovedIfGoodBlock(DBlock block) { + private boolean markMovedIfGoodBlock(DBlock block, StorageType targetStorageType) { synchronized (block) { synchronized (movedBlocks) { - if (isGoodBlockCandidate(source, target, block)) { + if (isGoodBlockCandidate(source, target, targetStorageType, block)) { this.block = block; if (chooseProxySource()) { movedBlocks.put(block); @@ -235,7 +233,7 @@ public class Dispatcher { * * @return true if a proxy is found; otherwise false */ - public boolean chooseProxySource() { + private boolean chooseProxySource() { final DatanodeInfo targetDN = target.getDatanodeInfo(); // if node group is supported, first try add nodes in the same node group if (cluster.isNodeGroupAware()) { @@ -440,6 +438,18 @@ public class Dispatcher { scheduledSize = 0L; } + private PendingMove addPendingMove(DBlock block, final PendingMove pm) { + if (getDDatanode().addPendingBlock(pm)) { + if (pm.markMovedIfGoodBlock(block, getStorageType())) { + incScheduledSize(pm.block.getNumBytes()); + return pm; + } else { + getDDatanode().removePendingBlock(pm); + } + } + return null; + } + /** @return the name for display */ String getDisplayName() { return datanode + ":" + storageType; @@ -599,8 +609,11 @@ public class Dispatcher { /** Decide if the given block is a good candidate to move or not */ private boolean isGoodBlockCandidate(DBlock block) { + // source and target must have the same storage type + final StorageType sourceStorageType = getStorageType(); for (Task t : tasks) { - if (Dispatcher.this.isGoodBlockCandidate(this, t.target, block)) { + if (Dispatcher.this.isGoodBlockCandidate(this, t.target, + sourceStorageType, block)) { return true; } } @@ -620,11 +633,9 @@ public class Dispatcher { for (Iterator i = tasks.iterator(); i.hasNext();) { final Task task = i.next(); final DDatanode target = task.target.getDDatanode(); - PendingMove pendingBlock = new PendingMove(); + final PendingMove pendingBlock = new PendingMove(this, task.target); if (target.addPendingBlock(pendingBlock)) { // target is not busy, so do a tentative block allocation - pendingBlock.source = this; - pendingBlock.target = task.target; if (pendingBlock.chooseBlockAndProxy()) { long blockSize = pendingBlock.block.getNumBytes(); incScheduledSize(-blockSize); @@ -641,6 +652,11 @@ public class Dispatcher { } return null; } + + /** Add a pending move */ + public PendingMove addPendingMove(DBlock block, StorageGroup target) { + return target.addPendingMove(block, new PendingMove(this, target)); + } /** Iterate all source's blocks to remove moved ones */ private void removeMovedBlocks() { @@ -901,12 +917,6 @@ public class Dispatcher { } } - private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target, - DBlock block) { - // match source and target storage type - return isGoodBlockCandidate(source, target, source.getStorageType(), block); - } - /** * Decide if the block is a good candidate to be moved from source to target. * A block is a good candidate if @@ -914,7 +924,7 @@ public class Dispatcher { * 2. the block does not have a replica on the target; * 3. doing the move does not reduce the number of racks that the block has */ - public boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target, + private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target, StorageType targetStorageType, DBlock block) { if (target.storageType != targetStorageType) { return false; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java index 54febc6fee7..f8d00711b87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java @@ -31,6 +31,11 @@ public interface Matcher { public boolean match(NetworkTopology cluster, Node left, Node right) { return cluster.isOnSameNodeGroup(left, right); } + + @Override + public String toString() { + return "SAME_NODE_GROUP"; + } }; /** Match datanodes in the same rack. */ @@ -39,6 +44,11 @@ public interface Matcher { public boolean match(NetworkTopology cluster, Node left, Node right) { return cluster.isOnSameRack(left, right); } + + @Override + public String toString() { + return "SAME_RACK"; + } }; /** Match any datanode with any other datanode. */ @@ -47,5 +57,10 @@ public interface Matcher { public boolean match(NetworkTopology cluster, Node left, Node right) { return left != right; } + + @Override + public String toString() { + return "ANY_OTHER"; + } }; } \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 424998c624a..4dbe1d37c87 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -50,7 +50,7 @@ import java.util.*; public class Mover { static final Log LOG = LogFactory.getLog(Mover.class); - private static final Path MOVER_ID_PATH = new Path("/system/mover.id"); + static final Path MOVER_ID_PATH = new Path("/system/mover.id"); private static class StorageMap { private final StorageGroupMap sources @@ -111,22 +111,25 @@ public class Mover { this.storages = new StorageMap(); this.blockStoragePolicies = BlockStoragePolicy.readBlockStorageSuite(conf); } - - private ExitStatus run() { - try { - final List reports = dispatcher.init(); - for(DatanodeStorageReport r : reports) { - final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); - for(StorageType t : StorageType.asList()) { - final long maxRemaining = getMaxRemaining(r, t); - if (maxRemaining > 0L) { - final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher); - final StorageGroup target = dn.addTarget(t, maxRemaining); - storages.add(source, target); - } + + void init() throws IOException { + final List reports = dispatcher.init(); + for(DatanodeStorageReport r : reports) { + final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); + for(StorageType t : StorageType.asList()) { + final long maxRemaining = getMaxRemaining(r, t); + if (maxRemaining > 0L) { + final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher); + final StorageGroup target = dn.addTarget(t, maxRemaining); + storages.add(source, target); } } + } + } + private ExitStatus run() { + try { + init(); new Processor().processNamespace(); return ExitStatus.IN_PROGRESS; @@ -141,6 +144,14 @@ public class Mover { } } + DBlock newDBlock(Block block, List locations) { + final DBlock db = new DBlock(block); + for(MLocation ml : locations) { + db.addLocation(storages.getTarget(ml)); + } + return db; + } + private static long getMaxRemaining(DatanodeStorageReport report, StorageType t) { long max = 0L; for(StorageReport r : report.getStorageReports()) { @@ -169,11 +180,11 @@ public class Mover { return sb.toString(); } - private class Processor { + class Processor { private final DFSClient dfs; private final List snapshottableDirs = new ArrayList(); - private Processor() { + Processor() { dfs = dispatcher.getDistributedFileSystem().getClient(); } @@ -290,15 +301,11 @@ public class Mover { } } } - + void scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) { final List locations = MLocation.toLocations(lb); Collections.shuffle(locations); - - final DBlock db = new DBlock(lb.getBlock().getLocalBlock()); - for(MLocation ml : locations) { - db.addLocation(storages.getTarget(ml)); - } + final DBlock db = newDBlock(lb.getBlock().getLocalBlock(), locations); for(final Iterator i = diff.existing.iterator(); i.hasNext(); ) { final StorageType t = i.next(); @@ -310,12 +317,18 @@ public class Mover { if (scheduleMoveReplica(db, ml, source, diff.expected)) { i.remove(); j.remove(); + return; } } } } } + boolean scheduleMoveReplica(DBlock db, MLocation ml, + List targetTypes) { + return scheduleMoveReplica(db, ml, storages.getSource(ml), targetTypes); + } + boolean scheduleMoveReplica(DBlock db, MLocation ml, Source source, List targetTypes) { if (dispatcher.getCluster().isNodeGroupAware()) { @@ -341,12 +354,10 @@ public class Mover { for(final Iterator i = targetTypes.iterator(); i.hasNext(); ) { final StorageType t = i.next(); for(StorageGroup target : storages.getTargetStorages(t)) { - if (matcher.match(cluster, ml.datanode, target.getDatanodeInfo()) - && dispatcher.isGoodBlockCandidate(source, target, t, db)) { - final PendingMove pm = dispatcher.new PendingMove(db, source, target); - if (pm.chooseProxySource()) { + if (matcher.match(cluster, ml.datanode, target.getDatanodeInfo())) { + final PendingMove pm = source.addPendingMove(db, target); + if (pm != null) { i.remove(); - target.incScheduledSize(ml.size); dispatcher.executePendingMove(pm); return true; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java index f2c9fb1a370..ff5e995a2f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestBlockStoragePolicy.java @@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs; import static org.apache.hadoop.hdfs.BlockStoragePolicy.ID_UNSPECIFIED; import java.io.FileNotFoundException; -import java.util.ArrayList; import java.util.Arrays; import java.util.EnumSet; import java.util.HashMap; @@ -62,20 +61,6 @@ public class TestBlockStoragePolicy { static final byte WARM = (byte) 8; static final byte HOT = (byte) 12; - static final List> chosens = new ArrayList>(); - static { - chosens.add(Arrays.asList()); - chosens.add(Arrays.asList(StorageType.DISK)); - chosens.add(Arrays.asList(StorageType.ARCHIVE)); - chosens.add(Arrays.asList(StorageType.DISK, StorageType.DISK)); - chosens.add(Arrays.asList(StorageType.DISK, StorageType.ARCHIVE)); - chosens.add(Arrays.asList(StorageType.ARCHIVE, StorageType.ARCHIVE)); - chosens.add(Arrays.asList(StorageType.DISK, StorageType.DISK, StorageType.DISK)); - chosens.add(Arrays.asList(StorageType.DISK, StorageType.DISK, StorageType.ARCHIVE)); - chosens.add(Arrays.asList(StorageType.DISK, StorageType.ARCHIVE, StorageType.ARCHIVE)); - chosens.add(Arrays.asList(StorageType.ARCHIVE, StorageType.ARCHIVE, StorageType.ARCHIVE)); - } - @Test public void testDefaultPolicies() { final Map expectedPolicyStrings = new HashMap(); @@ -126,6 +111,17 @@ public class TestBlockStoragePolicy { } } + static StorageType[] newStorageTypes(int nDisk, int nArchive) { + final StorageType[] t = new StorageType[nDisk + nArchive]; + Arrays.fill(t, 0, nDisk, StorageType.DISK); + Arrays.fill(t, nDisk, t.length, StorageType.ARCHIVE); + return t; + } + + static List asList(int nDisk, int nArchive) { + return Arrays.asList(newStorageTypes(nDisk, nArchive)); + } + static void assertStorageType(List computed, short replication, StorageType... answers) { Assert.assertEquals(replication, computed.size()); @@ -369,10 +365,14 @@ public class TestBlockStoragePolicy { final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD); final short replication = 3; - for(List c : chosens) { - method.checkChooseStorageTypes(hot, replication, c); - method.checkChooseStorageTypes(warm, replication, c); - method.checkChooseStorageTypes(cold, replication, c); + for(int n = 0; n <= 3; n++) { + for(int d = 0; d <= n; d++) { + final int a = n - d; + final List chosen = asList(d, a); + method.checkChooseStorageTypes(hot, replication, chosen); + method.checkChooseStorageTypes(warm, replication, chosen); + method.checkChooseStorageTypes(cold, replication, chosen); + } } } @@ -714,6 +714,47 @@ public class TestBlockStoragePolicy { Assert.assertArrayEquals(expected, computed); } + @Test + public void testChooseExcess() { + final BlockStoragePolicy hot = POLICY_SUITE.getPolicy(HOT); + final BlockStoragePolicy warm = POLICY_SUITE.getPolicy(WARM); + final BlockStoragePolicy cold = POLICY_SUITE.getPolicy(COLD); + + final short replication = 3; + for(int n = 0; n <= 6; n++) { + for(int d = 0; d <= n; d++) { + final int a = n - d; + final List chosen = asList(d, a); + { + final int nDisk = Math.max(0, d - replication); + final int nArchive = a; + final StorageType[] expected = newStorageTypes(nDisk, nArchive); + checkChooseExcess(hot, replication, chosen, expected); + } + + { + final int nDisk = Math.max(0, d - 1); + final int nArchive = Math.max(0, a - replication + 1); + final StorageType[] expected = newStorageTypes(nDisk, nArchive); + checkChooseExcess(warm, replication, chosen, expected); + } + + { + final int nDisk = d; + final int nArchive = Math.max(0, a - replication ); + final StorageType[] expected = newStorageTypes(nDisk, nArchive); + checkChooseExcess(cold, replication, chosen, expected); + } + } + } + } + + static void checkChooseExcess(BlockStoragePolicy p, short replication, + List chosen, StorageType... expected) { + final List types = p.chooseExcess(replication, chosen); + assertStorageTypes(types, expected); + } + private void checkDirectoryListing(HdfsFileStatus[] stats, byte... policies) { Assert.assertEquals(stats.length, policies.length); for (int i = 0; i < stats.length; i++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java new file mode 100644 index 00000000000..da913e70baf --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -0,0 +1,85 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.mover; + +import java.io.IOException; +import java.net.URI; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock; +import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; +import org.apache.hadoop.hdfs.server.mover.Mover.MLocation; +import org.junit.Assert; +import org.junit.Test; + +public class TestMover { + static Mover newMover(Configuration conf) throws IOException { + final Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + Assert.assertEquals(1, namenodes.size()); + + final List nncs = NameNodeConnector.newNameNodeConnectors( + namenodes, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf); + return new Mover(nncs.get(0), conf); + } + + @Test + public void testScheduleSameBlock() throws IOException { + final Configuration conf = new HdfsConfiguration(); + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(4).build(); + try { + cluster.waitActive(); + final DistributedFileSystem dfs = cluster.getFileSystem(); + final String file = "/testScheduleSameBlock/file"; + + { + final FSDataOutputStream out = dfs.create(new Path(file)); + out.writeChars("testScheduleSameBlock"); + out.close(); + } + + final Mover mover = newMover(conf); + mover.init(); + final Mover.Processor processor = mover.new Processor(); + + final LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); + final List locations = MLocation.toLocations(lb); + final MLocation ml = locations.get(0); + final DBlock db = mover.newDBlock(lb.getBlock().getLocalBlock(), locations); + + final List storageTypes = new ArrayList( + Arrays.asList(StorageType.DEFAULT, StorageType.DEFAULT)); + Assert.assertTrue(processor.scheduleMoveReplica(db, ml, storageTypes)); + Assert.assertFalse(processor.scheduleMoveReplica(db, ml, storageTypes)); + } finally { + cluster.shutdown(); + } + } +}