diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java index c39d04c7c74..a0935258870 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/BlockStoragePolicy.java @@ -180,7 +180,23 @@ public class BlockStoragePolicy { public StorageType getReplicationFallback(EnumSet unavailables) { return getFallback(unavailables, replicationFallbacks); } - + + @Override + public int hashCode() { + return Byte.valueOf(id).hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else if (obj == null || !(obj instanceof BlockStoragePolicy)) { + return false; + } + final BlockStoragePolicy that = (BlockStoragePolicy)obj; + return this.id == that.id; + } + @Override public String toString() { return getClass().getSimpleName() + "{" + name + ":" + id @@ -193,6 +209,10 @@ public class BlockStoragePolicy { return id; } + public String getName() { + return name; + } + private static StorageType getFallback(EnumSet unavailables, StorageType[] fallbacks) { for(StorageType fb : fallbacks) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 564f0377613..be6faebc40a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -362,6 +362,12 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_BALANCER_MOVERTHREADS_DEFAULT = 1000; public static final String DFS_BALANCER_DISPATCHERTHREADS_KEY = "dfs.balancer.dispatcherThreads"; public static final int DFS_BALANCER_DISPATCHERTHREADS_DEFAULT = 200; + + public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth"; + public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; + public static final String DFS_MOVER_MOVERTHREADS_KEY = "dfs.mover.moverThreads"; + public static final int DFS_MOVER_MOVERTHREADS_DEFAULT = 1000; + public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address"; public static final int DFS_DATANODE_DEFAULT_PORT = 50010; public static final String DFS_DATANODE_ADDRESS_DEFAULT = "0.0.0.0:" + DFS_DATANODE_DEFAULT_PORT; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 7661d25ee7f..ee49bcf488c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -23,7 +23,6 @@ import java.io.IOException; import java.io.PrintStream; import java.net.URI; import java.text.DateFormat; -import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -54,6 +53,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockPlacementPolicyDefault import org.apache.hadoop.hdfs.server.namenode.UnsupportedActionException; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; @@ -270,7 +270,7 @@ public class Balancer { // over-utilized, above-average, below-average and under-utilized. long overLoadedBytes = 0L, underLoadedBytes = 0L; for(DatanodeStorageReport r : reports) { - final DDatanode dn = dispatcher.newDatanode(r); + final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); for(StorageType t : StorageType.asList()) { final Double utilization = policy.getUtilization(r, t); if (utilization == null) { // datanode does not have such storage type @@ -294,7 +294,7 @@ public class Balancer { } g = s; } else { - g = dn.addStorageGroup(t, maxSize2Move); + g = dn.addTarget(t, maxSize2Move); if (thresholdDiff <= 0) { // within threshold belowAvgUtilized.add(g); } else { @@ -546,15 +546,10 @@ public class Balancer { final Formatter formatter = new Formatter(System.out); System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); - final List connectors - = new ArrayList(namenodes.size()); + List connectors = Collections.emptyList(); try { - for (URI uri : namenodes) { - final NameNodeConnector nnc = new NameNodeConnector( - Balancer.class.getSimpleName(), uri, BALANCER_ID_PATH, conf); - nnc.getKeyManager().startBlockKeyUpdater(); - connectors.add(nnc); - } + connectors = NameNodeConnector.newNameNodeConnectors(namenodes, + Balancer.class.getSimpleName(), BALANCER_ID_PATH, conf); boolean done = false; for(int iteration = 0; !done; iteration++) { @@ -579,7 +574,7 @@ public class Balancer { } } finally { for(NameNodeConnector nnc : connectors) { - nnc.close(); + IOUtils.cleanup(LOG, nnc); } } return ExitStatus.SUCCESS.getExitCode(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index b7dceb6f48d..bb498cca837 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -49,6 +49,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.StorageType; import org.apache.hadoop.hdfs.protocol.Block; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -103,7 +104,8 @@ public class Dispatcher { private final MovedBlocks movedBlocks; /** Map (datanodeUuid,storageType -> StorageGroup) */ - private final StorageGroupMap storageGroupMap = new StorageGroupMap(); + private final StorageGroupMap storageGroupMap + = new StorageGroupMap(); private NetworkTopology cluster; @@ -140,18 +142,18 @@ public class Dispatcher { } } - static class StorageGroupMap { + public static class StorageGroupMap { private static String toKey(String datanodeUuid, StorageType storageType) { return datanodeUuid + ":" + storageType; } - private final Map map = new HashMap(); + private final Map map = new HashMap(); - StorageGroup get(String datanodeUuid, StorageType storageType) { + public G get(String datanodeUuid, StorageType storageType) { return map.get(toKey(datanodeUuid, storageType)); } - void put(StorageGroup g) { + public void put(G g) { final String key = toKey(g.getDatanodeInfo().getDatanodeUuid(), g.storageType); final StorageGroup existing = map.put(key, g); Preconditions.checkState(existing == null); @@ -167,7 +169,7 @@ public class Dispatcher { } /** This class keeps track of a scheduled block move */ - private class PendingMove { + public class PendingMove { private DBlock block; private Source source; private DDatanode proxySource; @@ -176,6 +178,12 @@ public class Dispatcher { private PendingMove() { } + public PendingMove(DBlock block, Source source, StorageGroup target) { + this.block = block; + this.source = source; + this.target = target; + } + @Override public String toString() { final Block b = block.getBlock(); @@ -227,7 +235,7 @@ public class Dispatcher { * * @return true if a proxy is found; otherwise false */ - private boolean chooseProxySource() { + public boolean chooseProxySource() { final DatanodeInfo targetDN = target.getDatanodeInfo(); // if node group is supported, first try add nodes in the same node group if (cluster.isNodeGroupAware()) { @@ -356,8 +364,8 @@ public class Dispatcher { } /** A class for keeping track of block locations in the dispatcher. */ - private static class DBlock extends MovedBlocks.Locations { - DBlock(Block block) { + public static class DBlock extends MovedBlocks.Locations { + public DBlock(Block block) { super(block); } } @@ -378,10 +386,10 @@ public class Dispatcher { } /** A class that keeps track of a datanode. */ - static class DDatanode { + public static class DDatanode { /** A group of storages in a datanode with the same storage type. */ - class StorageGroup { + public class StorageGroup { final StorageType storageType; final long maxSize2Move; private long scheduledSize = 0L; @@ -390,18 +398,26 @@ public class Dispatcher { this.storageType = storageType; this.maxSize2Move = maxSize2Move; } + + public StorageType getStorageType() { + return storageType; + } private DDatanode getDDatanode() { return DDatanode.this; } - DatanodeInfo getDatanodeInfo() { + public DatanodeInfo getDatanodeInfo() { return DDatanode.this.datanode; } /** Decide if still need to move more bytes */ - synchronized boolean hasSpaceForScheduling() { - return availableSizeToMove() > 0L; + boolean hasSpaceForScheduling() { + return hasSpaceForScheduling(0L); + } + + synchronized boolean hasSpaceForScheduling(long size) { + return availableSizeToMove() > size; } /** @return the total number of bytes that need to be moved */ @@ -410,7 +426,7 @@ public class Dispatcher { } /** increment scheduled size */ - synchronized void incScheduledSize(long size) { + public synchronized void incScheduledSize(long size) { scheduledSize += size; } @@ -436,7 +452,9 @@ public class Dispatcher { } final DatanodeInfo datanode; - final EnumMap storageMap + private final EnumMap sourceMap + = new EnumMap(StorageType.class); + private final EnumMap targetMap = new EnumMap(StorageType.class); protected long delayUntil = 0L; /** blocks being moved but not confirmed yet */ @@ -445,29 +463,34 @@ public class Dispatcher { @Override public String toString() { - return getClass().getSimpleName() + ":" + datanode + ":" + storageMap.values(); + return getClass().getSimpleName() + ":" + datanode; } - private DDatanode(DatanodeStorageReport r, int maxConcurrentMoves) { - this.datanode = r.getDatanodeInfo(); + private DDatanode(DatanodeInfo datanode, int maxConcurrentMoves) { + this.datanode = datanode; this.maxConcurrentMoves = maxConcurrentMoves; this.pendings = new ArrayList(maxConcurrentMoves); } - private void put(StorageType storageType, StorageGroup g) { - final StorageGroup existing = storageMap.put(storageType, g); + public DatanodeInfo getDatanodeInfo() { + return datanode; + } + + private static void put(StorageType storageType, + G g, EnumMap map) { + final StorageGroup existing = map.put(storageType, g); Preconditions.checkState(existing == null); } - StorageGroup addStorageGroup(StorageType storageType, long maxSize2Move) { + public StorageGroup addTarget(StorageType storageType, long maxSize2Move) { final StorageGroup g = new StorageGroup(storageType, maxSize2Move); - put(storageType, g); + put(storageType, g, targetMap); return g; } - Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d) { + public Source addSource(StorageType storageType, long maxSize2Move, Dispatcher d) { final Source s = d.new Source(storageType, maxSize2Move, this); - put(storageType, s); + put(storageType, s, sourceMap); return s; } @@ -508,7 +531,7 @@ public class Dispatcher { } /** A node that can be the sources of a block move */ - class Source extends DDatanode.StorageGroup { + public class Source extends DDatanode.StorageGroup { private final List tasks = new ArrayList(2); private long blocksToReceive = 0L; @@ -654,13 +677,7 @@ public class Dispatcher { && (!srcBlocks.isEmpty() || blocksToReceive > 0)) { final PendingMove p = chooseNextMove(); if (p != null) { - // move the block - moveExecutor.execute(new Runnable() { - @Override - public void run() { - p.dispatch(); - } - }); + executePendingMove(p); continue; } @@ -716,7 +733,8 @@ public class Dispatcher { this.cluster = NetworkTopology.getInstance(conf); this.moveExecutor = Executors.newFixedThreadPool(moverThreads); - this.dispatchExecutor = Executors.newFixedThreadPool(dispatcherThreads); + this.dispatchExecutor = dispatcherThreads == 0? null + : Executors.newFixedThreadPool(dispatcherThreads); this.maxConcurrentMovesPerNode = maxConcurrentMovesPerNode; final boolean fallbackToSimpleAuthAllowed = conf.getBoolean( @@ -727,11 +745,15 @@ public class Dispatcher { TrustedChannelResolver.getInstance(conf), fallbackToSimpleAuthAllowed); } - StorageGroupMap getStorageGroupMap() { + public DistributedFileSystem getDistributedFileSystem() { + return nnc.getDistributedFileSystem(); + } + + public StorageGroupMap getStorageGroupMap() { return storageGroupMap; } - NetworkTopology getCluster() { + public NetworkTopology getCluster() { return cluster; } @@ -779,7 +801,7 @@ public class Dispatcher { } /** Get live datanode storage reports and then build the network topology. */ - List init() throws IOException { + public List init() throws IOException { final DatanodeStorageReport[] reports = nnc.getLiveDatanodeStorageReport(); final List trimmed = new ArrayList(); // create network topology and classify utilization collections: @@ -795,8 +817,18 @@ public class Dispatcher { return trimmed; } - public DDatanode newDatanode(DatanodeStorageReport r) { - return new DDatanode(r, maxConcurrentMovesPerNode); + public DDatanode newDatanode(DatanodeInfo datanode) { + return new DDatanode(datanode, maxConcurrentMovesPerNode); + } + + public void executePendingMove(final PendingMove p) { + // move the block + moveExecutor.execute(new Runnable() { + @Override + public void run() { + p.dispatch(); + } + }); } public boolean dispatchAndCheckContinue() throws InterruptedException { @@ -869,6 +901,12 @@ public class Dispatcher { } } + private boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target, + DBlock block) { + // match source and target storage type + return isGoodBlockCandidate(source, target, source.getStorageType(), block); + } + /** * Decide if the block is a good candidate to be moved from source to target. * A block is a good candidate if @@ -876,9 +914,12 @@ public class Dispatcher { * 2. the block does not have a replica on the target; * 3. doing the move does not reduce the number of racks that the block has */ - private boolean isGoodBlockCandidate(Source source, StorageGroup target, - DBlock block) { - if (source.storageType != target.storageType) { + public boolean isGoodBlockCandidate(StorageGroup source, StorageGroup target, + StorageType targetStorageType, DBlock block) { + if (target.storageType != targetStorageType) { + return false; + } + if (!target.hasSpaceForScheduling(block.getNumBytes())) { return false; } // check if the block is moved or not @@ -889,7 +930,7 @@ public class Dispatcher { return false; } if (cluster.isNodeGroupAware() - && isOnSameNodeGroupWithReplicas(target, block, source)) { + && isOnSameNodeGroupWithReplicas(source, target, block)) { return false; } if (reduceNumOfRacks(source, target, block)) { @@ -902,7 +943,7 @@ public class Dispatcher { * Determine whether moving the given block replica from source to target * would reduce the number of racks of the block replicas. */ - private boolean reduceNumOfRacks(Source source, StorageGroup target, + private boolean reduceNumOfRacks(StorageGroup source, StorageGroup target, DBlock block) { final DatanodeInfo sourceDn = source.getDatanodeInfo(); if (cluster.isOnSameRack(sourceDn, target.getDatanodeInfo())) { @@ -939,8 +980,8 @@ public class Dispatcher { * @return true if there are any replica (other than source) on the same node * group with target */ - private boolean isOnSameNodeGroupWithReplicas( - StorageGroup target, DBlock block, Source source) { + private boolean isOnSameNodeGroupWithReplicas(StorageGroup source, + StorageGroup target, DBlock block) { final DatanodeInfo targetDn = target.getDatanodeInfo(); for (StorageGroup g : block.getLocations()) { if (g != source && cluster.isOnSameNodeGroup(g.getDatanodeInfo(), targetDn)) { @@ -961,7 +1002,7 @@ public class Dispatcher { } /** shutdown thread pools */ - void shutdownNow() { + public void shutdownNow() { dispatchExecutor.shutdownNow(); moveExecutor.shutdownNow(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 820a4edd579..a86023e416c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -23,6 +23,9 @@ import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -31,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; @@ -51,6 +55,20 @@ public class NameNodeConnector implements Closeable { private static final Log LOG = LogFactory.getLog(NameNodeConnector.class); private static final int MAX_NOT_CHANGED_ITERATIONS = 5; + + /** Create {@link NameNodeConnector} for the given namenodes. */ + public static List newNameNodeConnectors( + Collection namenodes, String name, Path idPath, Configuration conf) + throws IOException { + final List connectors = new ArrayList( + namenodes.size()); + for (URI uri : namenodes) { + NameNodeConnector nnc = new NameNodeConnector(name, uri, idPath, conf); + nnc.getKeyManager().startBlockKeyUpdater(); + connectors.add(nnc); + } + return connectors; + } private final URI nameNodeUri; private final String blockpoolID; @@ -59,7 +77,7 @@ public class NameNodeConnector implements Closeable { private final ClientProtocol client; private final KeyManager keyManager; - private final FileSystem fs; + private final DistributedFileSystem fs; private final Path idPath; private final OutputStream out; @@ -74,7 +92,7 @@ public class NameNodeConnector implements Closeable { NamenodeProtocol.class).getProxy(); this.client = NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class).getProxy(); - this.fs = FileSystem.get(nameNodeUri, conf); + this.fs = (DistributedFileSystem)FileSystem.get(nameNodeUri, conf); final NamespaceInfo namespaceinfo = namenode.versionRequest(); this.blockpoolID = namespaceinfo.getBlockPoolID(); @@ -89,6 +107,10 @@ public class NameNodeConnector implements Closeable { } } + public DistributedFileSystem getDistributedFileSystem() { + return fs; + } + /** @return the block pool ID */ public String getBlockpoolID() { return blockpoolID; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java new file mode 100644 index 00000000000..a43abf24545 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -0,0 +1,431 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.mover; + +import java.io.IOException; +import java.net.URI; +import java.text.DateFormat; +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.EnumMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hdfs.BlockStoragePolicy; +import org.apache.hadoop.hdfs.DFSClient; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSUtil; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.StorageType; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; +import org.apache.hadoop.hdfs.protocol.DirectoryListing; +import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; +import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; +import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DDatanode.StorageGroup; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.PendingMove; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.Source; +import org.apache.hadoop.hdfs.server.balancer.Dispatcher.StorageGroupMap; +import org.apache.hadoop.hdfs.server.balancer.ExitStatus; +import org.apache.hadoop.hdfs.server.balancer.Matcher; +import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; +import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; +import org.apache.hadoop.hdfs.server.protocol.StorageReport; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.util.Time; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; + +@InterfaceAudience.Private +public class Mover { + static final Log LOG = LogFactory.getLog(Mover.class); + + private static final Path MOVER_ID_PATH = new Path("/system/mover.id"); + + private static class StorageMap { + private final StorageGroupMap sources + = new StorageGroupMap(); + private final StorageGroupMap targets + = new StorageGroupMap(); + private final EnumMap> targetStorageTypeMap + = new EnumMap>(StorageType.class); + + private StorageMap() { + for(StorageType t : StorageType.asList()) { + targetStorageTypeMap.put(t, new LinkedList()); + } + } + + private void add(Source source, StorageGroup target) { + sources.put(source); + targets.put(target); + getTargetStorages(target.getStorageType()).add(target); + } + + private Source getSource(MLocation ml) { + return get(sources, ml); + } + + private StorageGroup getTarget(MLocation ml) { + return get(targets, ml); + } + + private static G get(StorageGroupMap map, MLocation ml) { + return map.get(ml.datanode.getDatanodeUuid(), ml.storageType); + } + + private List getTargetStorages(StorageType t) { + return targetStorageTypeMap.get(t); + } + } + + private final Dispatcher dispatcher; + private final StorageMap storages; + + private final BlockStoragePolicy.Suite blockStoragePolicies; + + Mover(NameNodeConnector nnc, Configuration conf) { + final long movedWinWidth = conf.getLong( + DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, + DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT); + final int moverThreads = conf.getInt( + DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, + DFSConfigKeys.DFS_MOVER_MOVERTHREADS_DEFAULT); + final int maxConcurrentMovesPerNode = conf.getInt( + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); + + this.dispatcher = new Dispatcher(nnc, Collections. emptySet(), + Collections. emptySet(), movedWinWidth, moverThreads, 0, + maxConcurrentMovesPerNode, conf); + this.storages = new StorageMap(); + this.blockStoragePolicies = BlockStoragePolicy.readBlockStorageSuite(conf); + } + + private ExitStatus run() { + try { + final List reports = dispatcher.init(); + for(DatanodeStorageReport r : reports) { + final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo()); + for(StorageType t : StorageType.asList()) { + final long maxRemaining = getMaxRemaining(r, t); + if (maxRemaining > 0L) { + final Source source = dn.addSource(t, Long.MAX_VALUE, dispatcher); + final StorageGroup target = dn.addTarget(t, maxRemaining); + storages.add(source, target); + } + } + } + + new Processor().processNamespace(); + + return ExitStatus.IN_PROGRESS; + } catch (IllegalArgumentException e) { + System.out.println(e + ". Exiting ..."); + return ExitStatus.ILLEGAL_ARGUMENTS; + } catch (IOException e) { + System.out.println(e + ". Exiting ..."); + return ExitStatus.IO_EXCEPTION; + } finally { + dispatcher.shutdownNow(); + } + } + + private static long getMaxRemaining(DatanodeStorageReport report, StorageType t) { + long max = 0L; + for(StorageReport r : report.getStorageReports()) { + if (r.getStorage().getStorageType() == t) { + if (r.getRemaining() > max) { + max = r.getRemaining(); + } + } + } + return max; + } + + private class Processor { + private final DFSClient dfs; + + private Processor() { + dfs = dispatcher.getDistributedFileSystem().getClient(); + } + + private void processNamespace() { + try { + processDirRecursively("", dfs.getFileInfo("/")); + } catch (IOException e) { + LOG.warn("Failed to get root directory status. Ignore and continue.", e); + } + } + + private void processDirRecursively(String parent, HdfsFileStatus status) { + if (status.isSymlink()) { + return; //ignore symlinks + } else if (status.isDir()) { + String dir = status.getFullName(parent); + if (!dir.endsWith(Path.SEPARATOR)) { + dir = dir + Path.SEPARATOR; + } + + for(byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) { + final DirectoryListing children; + try { + children = dfs.listPaths(dir, lastReturnedName, true); + } catch(IOException e) { + LOG.warn("Failed to list directory " + dir + + ". Ignore the directory and continue.", e); + return; + } + if (children == null) { + return; + } + for (HdfsFileStatus child : children.getPartialListing()) { + processDirRecursively(dir, child); + } + if (!children.hasMore()) { + lastReturnedName = children.getLastName(); + } else { + return; + } + } + } else { // file + processFile(parent, (HdfsLocatedFileStatus)status); + } + } + + private void processFile(String parent, HdfsLocatedFileStatus status) { + final BlockStoragePolicy policy = blockStoragePolicies.getPolicy( + status.getStoragePolicy()); + final List types = policy.chooseStorageTypes( + status.getReplication()); + + final LocatedBlocks locations = status.getBlockLocations(); + for(LocatedBlock lb : locations.getLocatedBlocks()) { + final StorageTypeDiff diff = new StorageTypeDiff(types, lb.getStorageTypes()); + if (!diff.removeOverlap()) { + scheduleMoves4Block(diff, lb); + } + } + } + + void scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) { + final List locations = MLocation.toLocations(lb); + Collections.shuffle(locations); + + final DBlock db = new DBlock(lb.getBlock().getLocalBlock()); + for(MLocation ml : locations) { + db.addLocation(storages.getTarget(ml)); + } + + for(final Iterator i = diff.existing.iterator(); i.hasNext(); ) { + final StorageType t = i.next(); + for(final Iterator j = locations.iterator(); j.hasNext(); ) { + final MLocation ml = j.next(); + final Source source = storages.getSource(ml); + if (ml.storageType == t) { + // try to schedule replica move. + if (scheduleMoveReplica(db, ml, source, diff.expected)) { + i.remove(); + j.remove(); + } + } + } + } + } + + boolean scheduleMoveReplica(DBlock db, MLocation ml, Source source, + List targetTypes) { + if (dispatcher.getCluster().isNodeGroupAware()) { + if (chooseTarget(db, ml, source, targetTypes, Matcher.SAME_NODE_GROUP)) { + return true; + } + } + + // Then, match nodes on the same rack + if (chooseTarget(db, ml, source, targetTypes, Matcher.SAME_RACK)) { + return true; + } + // At last, match all remaining nodes + if (chooseTarget(db, ml, source, targetTypes, Matcher.ANY_OTHER)) { + return true; + } + return false; + } + + boolean chooseTarget(DBlock db, MLocation ml, Source source, + List targetTypes, Matcher matcher) { + final NetworkTopology cluster = dispatcher.getCluster(); + for(final Iterator i = targetTypes.iterator(); i.hasNext(); ) { + final StorageType t = i.next(); + for(StorageGroup target : storages.getTargetStorages(t)) { + if (matcher.match(cluster, ml.datanode, target.getDatanodeInfo()) + && dispatcher.isGoodBlockCandidate(source, target, t, db)) { + final PendingMove pm = dispatcher.new PendingMove(db, source, target); + if (pm.chooseProxySource()) { + i.remove(); + target.incScheduledSize(ml.size); + dispatcher.executePendingMove(pm); + return true; + } + } + } + } + return false; + } + } + + + static class MLocation { + final DatanodeInfo datanode; + final StorageType storageType; + final long size; + + MLocation(DatanodeInfo datanode, StorageType storageType, long size) { + this.datanode = datanode; + this.storageType = storageType; + this.size = size; + } + + static List toLocations(LocatedBlock lb) { + final DatanodeInfo[] datanodeInfos = lb.getLocations(); + final StorageType[] storageTypes = lb.getStorageTypes(); + final long size = lb.getBlockSize(); + final List locations = new LinkedList(); + for(int i = 0; i < datanodeInfos.length; i++) { + locations.add(new MLocation(datanodeInfos[i], storageTypes[i], size)); + } + return locations; + } + } + + private static class StorageTypeDiff { + final List expected; + final List existing; + + StorageTypeDiff(List expected, StorageType[] existing) { + this.expected = new LinkedList(expected); + this.existing = new LinkedList(Arrays.asList(existing)); + } + + /** + * Remove the overlap between the expected types and the existing types. + * @return if the existing types is empty after removed the overlap. + */ + boolean removeOverlap() { + for(Iterator i = existing.iterator(); i.hasNext(); ) { + final StorageType t = i.next(); + if (expected.remove(t)) { + i.remove(); + } + } + return existing.isEmpty(); + } + } + + static int run(Collection namenodes, Configuration conf) + throws IOException, InterruptedException { + final long sleeptime = 2000*conf.getLong( + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); + LOG.info("namenodes = " + namenodes); + + List connectors = Collections.emptyList(); + try { + connectors = NameNodeConnector.newNameNodeConnectors(namenodes, + Mover.class.getSimpleName(), MOVER_ID_PATH, conf); + + while (true) { + Collections.shuffle(connectors); + for(NameNodeConnector nnc : connectors) { + final Mover m = new Mover(nnc, conf); + final ExitStatus r = m.run(); + + if (r != ExitStatus.IN_PROGRESS) { + //must be an error statue, return. + return r.getExitCode(); + } + } + + Thread.sleep(sleeptime); + } + } finally { + for(NameNodeConnector nnc : connectors) { + IOUtils.cleanup(LOG, nnc); + } + } + } + + static class Cli extends Configured implements Tool { + private static final String USAGE = "Usage: java " + + Mover.class.getSimpleName(); + + @Override + public int run(String[] args) throws Exception { + final long startTime = Time.monotonicNow(); + final Configuration conf = getConf(); + + try { + final Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); + return Mover.run(namenodes, conf); + } catch (IOException e) { + System.out.println(e + ". Exiting ..."); + return ExitStatus.IO_EXCEPTION.getExitCode(); + } catch (InterruptedException e) { + System.out.println(e + ". Exiting ..."); + return ExitStatus.INTERRUPTED.getExitCode(); + } finally { + System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date())); + System.out.println("Mover took " + StringUtils.formatTime(Time.monotonicNow()-startTime)); + } + } + + /** + * Run a Mover in command line. + * + * @param args Command line arguments + */ + public static void main(String[] args) { + if (DFSUtil.parseHelpArgument(args, USAGE, System.out, true)) { + System.exit(0); + } + + try { + System.exit(ToolRunner.run(new HdfsConfiguration(), new Cli(), args)); + } catch (Throwable e) { + LOG.error("Exiting " + Mover.class.getSimpleName() + + " due to an exception", e); + System.exit(-1); + } + } + } +} \ No newline at end of file