diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index e204ca290ba..a048bf9b23b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -102,6 +102,10 @@ Release 2.6.0 - UNRELEASED HDFS-6787. Remove duplicate code in FSDirectory#unprotectedConcat. (Yi Liu via umamahesh) + HDFS-6809. Move Balancer's inner classes MovedBlocks and Matcher as to + standalone classes and separates KeyManager from NameNodeConnector. + (szetszwo) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 62c5c51ffbf..cc125388359 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -58,6 +58,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; +import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -85,7 +86,6 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; -import org.apache.hadoop.net.Node; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.StringUtils; @@ -196,10 +196,12 @@ import com.google.common.base.Preconditions; @InterfaceAudience.Private public class Balancer { static final Log LOG = LogFactory.getLog(Balancer.class); - final private static long GB = 1L << 30; //1GB - final private static long MAX_SIZE_TO_MOVE = 10*GB; - final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB; - private static long WIN_WIDTH = 5400*1000L; // 1.5 hour + + private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); + + private static final long GB = 1L << 30; //1GB + private static final long MAX_SIZE_TO_MOVE = 10*GB; + private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB; /** The maximum number of concurrent blocks moves for * balancing purpose at a datanode @@ -220,6 +222,8 @@ public class Balancer { + "\tIncludes only the specified datanodes."; private final NameNodeConnector nnc; + private final KeyManager keyManager; + private final BalancingPolicy policy; private final SaslDataTransferClient saslClient; private final double threshold; @@ -242,7 +246,8 @@ public class Balancer { private final Map globalBlockList = new HashMap(); - private final MovedBlocks movedBlocks = new MovedBlocks(); + private final MovedBlocks movedBlocks; + /** Map (datanodeUuid,storageType -> StorageGroup) */ private final StorageGroupMap storageGroupMap = new StorageGroupMap(); @@ -327,7 +332,7 @@ public class Balancer { if (isGoodBlockCandidate(source, target, block)) { this.block = block; if ( chooseProxySource() ) { - movedBlocks.add(block); + movedBlocks.put(block); if (LOG.isDebugEnabled()) { LOG.debug("Decided to move " + this); } @@ -400,10 +405,10 @@ public class Balancer { OutputStream unbufOut = sock.getOutputStream(); InputStream unbufIn = sock.getInputStream(); - ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock()); - Token accessToken = nnc.getAccessToken(eb); + ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), block.getBlock()); + Token accessToken = keyManager.getAccessToken(eb); IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, - unbufIn, nnc, accessToken, target.getDatanode()); + unbufIn, keyManager, accessToken, target.getDatanode()); unbufOut = saslStreams.out; unbufIn = saslStreams.in; out = new DataOutputStream(new BufferedOutputStream(unbufOut, @@ -484,47 +489,9 @@ public class Balancer { } /* A class for keeping track of blocks in the Balancer */ - static private class BalancerBlock { - private final Block block; // the block - /** The locations of the replicas of the block. */ - private final List locations - = new ArrayList(3); - - /* Constructor */ - private BalancerBlock(Block block) { - this.block = block; - } - - /* clean block locations */ - private synchronized void clearLocations() { - locations.clear(); - } - - /* add a location */ - private synchronized void addLocation(BalancerDatanode.StorageGroup g) { - if (!locations.contains(g)) { - locations.add(g); - } - } - - /** @return if the block is located on the given storage group. */ - private synchronized boolean isLocatedOn(BalancerDatanode.StorageGroup g) { - return locations.contains(g); - } - - /* Return its locations */ - private synchronized List getLocations() { - return locations; - } - - /* Return the block */ - private Block getBlock() { - return block; - } - - /* Return the length of the block */ - private long getNumBytes() { - return block.getNumBytes(); + static class BalancerBlock extends MovedBlocks.Locations { + BalancerBlock(Block block) { + super(block); } } @@ -736,7 +703,7 @@ public class Balancer { */ private long getBlockList() throws IOException { final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); - final BlockWithLocations[] newBlocks = nnc.namenode.getBlocks( + final BlockWithLocations[] newBlocks = nnc.getNamenode().getBlocks( getDatanode(), size).getBlocks(); long bytesReceived = 0; @@ -820,7 +787,7 @@ public class Balancer { private void filterMovedBlocks() { for (Iterator blocks=getBlockIterator(); blocks.hasNext();) { - if (movedBlocks.contains(blocks.next())) { + if (movedBlocks.contains(blocks.next().getBlock())) { blocks.remove(); } } @@ -926,6 +893,13 @@ public class Balancer { this.nodesToBeExcluded = p.nodesToBeExcluded; this.nodesToBeIncluded = p.nodesToBeIncluded; this.nnc = theblockpool; + this.keyManager = nnc.getKeyManager(); + + final long movedWinWidth = conf.getLong( + DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, + DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); + movedBlocks = new MovedBlocks(movedWinWidth); + cluster = NetworkTopology.getInstance(conf); this.moverExecutor = Executors.newFixedThreadPool( @@ -1095,36 +1069,6 @@ public class Balancer { LOG.info(items.size() + " " + name + ": " + items); } - /** A matcher interface for matching nodes. */ - private interface Matcher { - /** Given the cluster topology, does the left node match the right node? */ - boolean match(NetworkTopology cluster, Node left, Node right); - } - - /** Match datanodes in the same node group. */ - static final Matcher SAME_NODE_GROUP = new Matcher() { - @Override - public boolean match(NetworkTopology cluster, Node left, Node right) { - return cluster.isOnSameNodeGroup(left, right); - } - }; - - /** Match datanodes in the same rack. */ - static final Matcher SAME_RACK = new Matcher() { - @Override - public boolean match(NetworkTopology cluster, Node left, Node right) { - return cluster.isOnSameRack(left, right); - } - }; - - /** Match any datanode with any other datanode. */ - static final Matcher ANY_OTHER = new Matcher() { - @Override - public boolean match(NetworkTopology cluster, Node left, Node right) { - return left != right; - } - }; - /** * Decide all pairs and * the number of bytes to move from a source to a target @@ -1135,13 +1079,13 @@ public class Balancer { private long chooseStorageGroups() { // First, match nodes on the same node group if cluster is node group aware if (cluster.isNodeGroupAware()) { - chooseStorageGroups(SAME_NODE_GROUP); + chooseStorageGroups(Matcher.SAME_NODE_GROUP); } // Then, match nodes on the same rack - chooseStorageGroups(SAME_RACK); + chooseStorageGroups(Matcher.SAME_RACK); // At last, match all remaining nodes - chooseStorageGroups(ANY_OTHER); + chooseStorageGroups(Matcher.ANY_OTHER); Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(), "Mismatched number of datanodes (" + storageGroupMap.size() + " < " @@ -1308,56 +1252,6 @@ public class Balancer { } while (shouldWait); } - /** This window makes sure to keep blocks that have been moved within 1.5 hour. - * Old window has blocks that are older; - * Current window has blocks that are more recent; - * Cleanup method triggers the check if blocks in the old window are - * more than 1.5 hour old. If yes, purge the old window and then - * move blocks in current window to old window. - */ - private static class MovedBlocks { - private long lastCleanupTime = Time.now(); - final private static int CUR_WIN = 0; - final private static int OLD_WIN = 1; - final private static int NUM_WINS = 2; - final private List> movedBlocks = - new ArrayList>(NUM_WINS); - - /* initialize the moved blocks collection */ - private MovedBlocks() { - movedBlocks.add(new HashMap()); - movedBlocks.add(new HashMap()); - } - - /* add a block thus marking a block to be moved */ - synchronized private void add(BalancerBlock block) { - movedBlocks.get(CUR_WIN).put(block.getBlock(), block); - } - - /* check if a block is marked as moved */ - synchronized private boolean contains(BalancerBlock block) { - return contains(block.getBlock()); - } - - /* check if a block is marked as moved */ - synchronized private boolean contains(Block block) { - return movedBlocks.get(CUR_WIN).containsKey(block) || - movedBlocks.get(OLD_WIN).containsKey(block); - } - - /* remove old blocks */ - synchronized private void cleanup() { - long curTime = Time.now(); - // check if old win is older than winWidth - if (lastCleanupTime + WIN_WIDTH <= curTime) { - // purge the old window - movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN)); - movedBlocks.set(CUR_WIN, new HashMap()); - lastCleanupTime = curTime; - } - } - } - /* Decide if it is OK to move the given block from source to target * A block is a good candidate if * 1. the block is not in the process of being moved/has not been moved; @@ -1370,7 +1264,7 @@ public class Balancer { return false; } // check if the block is moved or not - if (movedBlocks.contains(block)) { + if (movedBlocks.contains(block.getBlock())) { return false; } if (block.isLocatedOn(target)) { @@ -1388,7 +1282,7 @@ public class Balancer { } else { boolean notOnSameRack = true; synchronized (block) { - for (BalancerDatanode.StorageGroup loc : block.locations) { + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) { notOnSameRack = false; break; @@ -1400,7 +1294,7 @@ public class Balancer { goodBlock = true; } else { // good if source is on the same rack as on of the replicas - for (BalancerDatanode.StorageGroup loc : block.locations) { + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { if (loc != source && cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) { goodBlock = true; @@ -1426,7 +1320,7 @@ public class Balancer { private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target, BalancerBlock block, Source source) { final DatanodeInfo targetDn = target.getDatanode(); - for (BalancerDatanode.StorageGroup loc : block.locations) { + for (BalancerDatanode.StorageGroup loc : block.getLocations()) { if (loc != source && cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) { return true; @@ -1490,7 +1384,7 @@ public class Balancer { * decide the number of bytes need to be moved */ final long bytesLeftToMove = init( - nnc.client.getDatanodeStorageReport(DatanodeReportType.LIVE)); + nnc.getClient().getDatanodeStorageReport(DatanodeReportType.LIVE)); if (bytesLeftToMove == 0) { System.out.println("The cluster is balanced. Exiting..."); return ReturnStatus.SUCCESS; @@ -1559,8 +1453,8 @@ public class Balancer { final long sleeptime = 2000*conf.getLong( DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); - LOG.info("namenodes = " + namenodes); - LOG.info("p = " + p); + LOG.info("namenodes = " + namenodes); + LOG.info("parameters = " + p); final Formatter formatter = new Formatter(System.out); System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); @@ -1569,7 +1463,10 @@ public class Balancer { = new ArrayList(namenodes.size()); try { for (URI uri : namenodes) { - connectors.add(new NameNodeConnector(uri, conf)); + final NameNodeConnector nnc = new NameNodeConnector( + Balancer.class.getSimpleName(), uri, BALANCER_ID_PATH, conf); + nnc.getKeyManager().startBlockKeyUpdater(); + connectors.add(nnc); } boolean done = false; @@ -1731,9 +1628,6 @@ public class Balancer { public int run(String[] args) { final long startTime = Time.now(); final Configuration conf = getConf(); - WIN_WIDTH = conf.getLong( - DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, - DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT); try { checkReplicationPolicyCompatibility(conf); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java new file mode 100644 index 00000000000..2ac8f483ee2 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/KeyManager.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.balancer; + +import java.io.Closeable; +import java.io.IOException; +import java.util.EnumSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.protocol.ExtendedBlock; +import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; +import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode; +import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; +import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; +import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; +import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.Daemon; +import org.apache.hadoop.util.StringUtils; + +/** + * The class provides utilities for key and token management. + */ +@InterfaceAudience.Private +public class KeyManager implements Closeable, DataEncryptionKeyFactory { + private static final Log LOG = LogFactory.getLog(KeyManager.class); + + private final NamenodeProtocol namenode; + + private final boolean isBlockTokenEnabled; + private final boolean encryptDataTransfer; + private boolean shouldRun; + + private final BlockTokenSecretManager blockTokenSecretManager; + private final BlockKeyUpdater blockKeyUpdater; + private DataEncryptionKey encryptionKey; + + public KeyManager(String blockpoolID, NamenodeProtocol namenode, + boolean encryptDataTransfer, Configuration conf) throws IOException { + this.namenode = namenode; + this.encryptDataTransfer = encryptDataTransfer; + + final ExportedBlockKeys keys = namenode.getBlockKeys(); + this.isBlockTokenEnabled = keys.isBlockTokenEnabled(); + if (isBlockTokenEnabled) { + long updateInterval = keys.getKeyUpdateInterval(); + long tokenLifetime = keys.getTokenLifetime(); + LOG.info("Block token params received from NN: update interval=" + + StringUtils.formatTime(updateInterval) + + ", token lifetime=" + StringUtils.formatTime(tokenLifetime)); + String encryptionAlgorithm = conf.get( + DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY); + this.blockTokenSecretManager = new BlockTokenSecretManager( + updateInterval, tokenLifetime, blockpoolID, encryptionAlgorithm); + this.blockTokenSecretManager.addKeys(keys); + + // sync block keys with NN more frequently than NN updates its block keys + this.blockKeyUpdater = new BlockKeyUpdater(updateInterval / 4); + this.shouldRun = true; + } else { + this.blockTokenSecretManager = null; + this.blockKeyUpdater = null; + } + } + + public void startBlockKeyUpdater() { + if (blockKeyUpdater != null) { + blockKeyUpdater.daemon.start(); + } + } + + /** Get an access token for a block. */ + public Token getAccessToken(ExtendedBlock eb + ) throws IOException { + if (!isBlockTokenEnabled) { + return BlockTokenSecretManager.DUMMY_TOKEN; + } else { + if (!shouldRun) { + throw new IOException( + "Cannot get access token since BlockKeyUpdater is not running"); + } + return blockTokenSecretManager.generateToken(null, eb, + EnumSet.of(AccessMode.REPLACE, AccessMode.COPY)); + } + } + + @Override + public DataEncryptionKey newDataEncryptionKey() { + if (encryptDataTransfer) { + synchronized (this) { + if (encryptionKey == null) { + encryptionKey = blockTokenSecretManager.generateDataEncryptionKey(); + } + return encryptionKey; + } + } else { + return null; + } + } + + @Override + public void close() { + shouldRun = false; + try { + if (blockKeyUpdater != null) { + blockKeyUpdater.daemon.interrupt(); + } + } catch(Exception e) { + LOG.warn("Exception shutting down access key updater thread", e); + } + } + + /** + * Periodically updates access keys. + */ + class BlockKeyUpdater implements Runnable, Closeable { + private final Daemon daemon = new Daemon(this); + private final long sleepInterval; + + BlockKeyUpdater(final long sleepInterval) { + this.sleepInterval = sleepInterval; + LOG.info("Update block keys every " + StringUtils.formatTime(sleepInterval)); + } + + @Override + public void run() { + try { + while (shouldRun) { + try { + blockTokenSecretManager.addKeys(namenode.getBlockKeys()); + } catch (IOException e) { + LOG.error("Failed to set keys", e); + } + Thread.sleep(sleepInterval); + } + } catch (InterruptedException e) { + LOG.debug("InterruptedException in block key updater thread", e); + } catch (Throwable e) { + LOG.error("Exception in block key updater thread", e); + shouldRun = false; + } + } + + @Override + public void close() throws IOException { + try { + daemon.interrupt(); + } catch(Exception e) { + LOG.warn("Exception shutting down key updater thread", e); + } + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java new file mode 100644 index 00000000000..54febc6fee7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Matcher.java @@ -0,0 +1,51 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.balancer; + +import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; + +/** A matcher interface for matching nodes. */ +public interface Matcher { + /** Given the cluster topology, does the left node match the right node? */ + public boolean match(NetworkTopology cluster, Node left, Node right); + + /** Match datanodes in the same node group. */ + public static final Matcher SAME_NODE_GROUP = new Matcher() { + @Override + public boolean match(NetworkTopology cluster, Node left, Node right) { + return cluster.isOnSameNodeGroup(left, right); + } + }; + + /** Match datanodes in the same rack. */ + public static final Matcher SAME_RACK = new Matcher() { + @Override + public boolean match(NetworkTopology cluster, Node left, Node right) { + return cluster.isOnSameRack(left, right); + } + }; + + /** Match any datanode with any other datanode. */ + public static final Matcher ANY_OTHER = new Matcher() { + @Override + public boolean match(NetworkTopology cluster, Node left, Node right) { + return left != right; + } + }; +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java new file mode 100644 index 00000000000..557bfd36ab0 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/MovedBlocks.java @@ -0,0 +1,124 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.balancer; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.util.Time; + +/** + * This window makes sure to keep blocks that have been moved within a fixed + * time interval (default is 1.5 hour). Old window has blocks that are older; + * Current window has blocks that are more recent; Cleanup method triggers the + * check if blocks in the old window are more than the fixed time interval. If + * yes, purge the old window and then move blocks in current window to old + * window. + * + * @param Location type + */ +public class MovedBlocks { + /** A class for keeping track of a block and its locations */ + public static class Locations { + private final Block block; // the block + /** The locations of the replicas of the block. */ + private final List locations = new ArrayList(3); + + public Locations(Block block) { + this.block = block; + } + + /** clean block locations */ + public synchronized void clearLocations() { + locations.clear(); + } + + /** add a location */ + public synchronized void addLocation(L loc) { + if (!locations.contains(loc)) { + locations.add(loc); + } + } + + /** @return if the block is located on the given location. */ + public synchronized boolean isLocatedOn(L loc) { + return locations.contains(loc); + } + + /** @return its locations */ + public synchronized List getLocations() { + return locations; + } + + /* @return the block */ + public Block getBlock() { + return block; + } + + /* Return the length of the block */ + public long getNumBytes() { + return block.getNumBytes(); + } + } + + private static final int CUR_WIN = 0; + private static final int OLD_WIN = 1; + private static final int NUM_WINS = 2; + + private final long winTimeInterval; + private long lastCleanupTime = Time.monotonicNow(); + private final List>> movedBlocks + = new ArrayList>>(NUM_WINS); + + /** initialize the moved blocks collection */ + public MovedBlocks(long winTimeInterval) { + this.winTimeInterval = winTimeInterval; + movedBlocks.add(newMap()); + movedBlocks.add(newMap()); + } + + private Map> newMap() { + return new HashMap>(); + } + + /** add a block thus marking a block to be moved */ + public synchronized void put(Locations block) { + movedBlocks.get(CUR_WIN).put(block.getBlock(), block); + } + + /** @return if a block is marked as moved */ + public synchronized boolean contains(Block block) { + return movedBlocks.get(CUR_WIN).containsKey(block) || + movedBlocks.get(OLD_WIN).containsKey(block); + } + + /** remove old blocks */ + public synchronized void cleanup() { + long curTime = Time.monotonicNow(); + // check if old win is older than winWidth + if (lastCleanupTime + winTimeInterval <= curTime) { + // purge the old window + movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN)); + movedBlocks.set(CUR_WIN, newMap()); + lastCleanupTime = curTime; + } + } +} \ No newline at end of file diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index fb322763d76..3d1c4e6946f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -17,113 +17,96 @@ */ package org.apache.hadoop.hdfs.server.balancer; +import java.io.Closeable; import java.io.DataOutputStream; import java.io.IOException; import java.io.OutputStream; import java.net.InetAddress; import java.net.URI; -import java.util.EnumSet; import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.FsServerDefaults; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.ClientProtocol; -import org.apache.hadoop.hdfs.protocol.ExtendedBlock; -import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory; -import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier; -import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager; -import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RemoteException; -import org.apache.hadoop.security.token.Token; -import org.apache.hadoop.util.Daemon; /** - * The class provides utilities for {@link Balancer} to access a NameNode + * The class provides utilities for accessing a NameNode. */ @InterfaceAudience.Private -class NameNodeConnector implements DataEncryptionKeyFactory { - private static final Log LOG = Balancer.LOG; - private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id"); +public class NameNodeConnector implements Closeable { + private static final Log LOG = LogFactory.getLog(NameNodeConnector.class); + private static final int MAX_NOT_CHANGED_ITERATIONS = 5; - final URI nameNodeUri; - final String blockpoolID; + private final URI nameNodeUri; + private final String blockpoolID; - final NamenodeProtocol namenode; - final ClientProtocol client; - final FileSystem fs; - final OutputStream out; + private final NamenodeProtocol namenode; + private final ClientProtocol client; + private final KeyManager keyManager; + + private final FileSystem fs; + private final Path idPath; + private final OutputStream out; - private final boolean isBlockTokenEnabled; - private final boolean encryptDataTransfer; - private boolean shouldRun; - private long keyUpdaterInterval; - // used for balancer private int notChangedIterations = 0; - private BlockTokenSecretManager blockTokenSecretManager; - private Daemon keyupdaterthread; // AccessKeyUpdater thread - private DataEncryptionKey encryptionKey; - NameNodeConnector(URI nameNodeUri, + public NameNodeConnector(String name, URI nameNodeUri, Path idPath, Configuration conf) throws IOException { this.nameNodeUri = nameNodeUri; + this.idPath = idPath; - this.namenode = - NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class) - .getProxy(); - this.client = - NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class) - .getProxy(); + this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri, + NamenodeProtocol.class).getProxy(); + this.client = NameNodeProxies.createProxy(conf, nameNodeUri, + ClientProtocol.class).getProxy(); this.fs = FileSystem.get(nameNodeUri, conf); final NamespaceInfo namespaceinfo = namenode.versionRequest(); this.blockpoolID = namespaceinfo.getBlockPoolID(); - final ExportedBlockKeys keys = namenode.getBlockKeys(); - this.isBlockTokenEnabled = keys.isBlockTokenEnabled(); - if (isBlockTokenEnabled) { - long blockKeyUpdateInterval = keys.getKeyUpdateInterval(); - long blockTokenLifetime = keys.getTokenLifetime(); - LOG.info("Block token params received from NN: keyUpdateInterval=" - + blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime=" - + blockTokenLifetime / (60 * 1000) + " min(s)"); - String encryptionAlgorithm = conf.get( - DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY); - this.blockTokenSecretManager = new BlockTokenSecretManager( - blockKeyUpdateInterval, blockTokenLifetime, blockpoolID, - encryptionAlgorithm); - this.blockTokenSecretManager.addKeys(keys); - /* - * Balancer should sync its block keys with NN more frequently than NN - * updates its block keys - */ - this.keyUpdaterInterval = blockKeyUpdateInterval / 4; - LOG.info("Balancer will update its block keys every " - + keyUpdaterInterval / (60 * 1000) + " minute(s)"); - this.keyupdaterthread = new Daemon(new BlockKeyUpdater()); - this.shouldRun = true; - this.keyupdaterthread.start(); - } - this.encryptDataTransfer = fs.getServerDefaults(new Path("/")) - .getEncryptDataTransfer(); - // Check if there is another balancer running. + final FsServerDefaults defaults = fs.getServerDefaults(new Path("/")); + this.keyManager = new KeyManager(blockpoolID, namenode, + defaults.getEncryptDataTransfer(), conf); // Exit if there is another one running. - out = checkAndMarkRunningBalancer(); + out = checkAndMarkRunning(); if (out == null) { - throw new IOException("Another balancer is running"); + throw new IOException("Another " + name + " is running."); } } - boolean shouldContinue(long dispatchBlockMoveBytes) { + /** @return the block pool ID */ + public String getBlockpoolID() { + return blockpoolID; + } + + /** @return the namenode proxy. */ + public NamenodeProtocol getNamenode() { + return namenode; + } + + /** @return the client proxy. */ + public ClientProtocol getClient() { + return client; + } + + /** @return the key manager */ + public KeyManager getKeyManager() { + return keyManager; + } + + /** Should the instance continue running? */ + public boolean shouldContinue(long dispatchBlockMoveBytes) { if (dispatchBlockMoveBytes > 0) { notChangedIterations = 0; } else { @@ -137,53 +120,25 @@ class NameNodeConnector implements DataEncryptionKeyFactory { return true; } - /** Get an access token for a block. */ - Token getAccessToken(ExtendedBlock eb - ) throws IOException { - if (!isBlockTokenEnabled) { - return BlockTokenSecretManager.DUMMY_TOKEN; - } else { - if (!shouldRun) { - throw new IOException( - "Can not get access token. BlockKeyUpdater is not running"); - } - return blockTokenSecretManager.generateToken(null, eb, - EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE, - BlockTokenSecretManager.AccessMode.COPY)); - } - } - @Override - public DataEncryptionKey newDataEncryptionKey() { - if (encryptDataTransfer) { - synchronized (this) { - if (encryptionKey == null) { - encryptionKey = blockTokenSecretManager.generateDataEncryptionKey(); - } - return encryptionKey; - } - } else { - return null; - } - } - - /* The idea for making sure that there is no more than one balancer + /** + * The idea for making sure that there is no more than one instance * running in an HDFS is to create a file in the HDFS, writes the hostname - * of the machine on which the balancer is running to the file, but did not - * close the file until the balancer exits. - * This prevents the second balancer from running because it can not + * of the machine on which the instance is running to the file, but did not + * close the file until it exits. + * + * This prevents the second instance from running because it can not * creates the file while the first one is running. * - * This method checks if there is any running balancer and - * if no, mark yes if no. + * This method checks if there is any running instance. If no, mark yes. * Note that this is an atomic operation. * - * Return null if there is a running balancer; otherwise the output stream - * to the newly created file. + * @return null if there is a running instance; + * otherwise, the output stream to the newly created file. */ - private OutputStream checkAndMarkRunningBalancer() throws IOException { + private OutputStream checkAndMarkRunning() throws IOException { try { - final DataOutputStream out = fs.create(BALANCER_ID_PATH); + final DataOutputStream out = fs.create(idPath); out.writeBytes(InetAddress.getLocalHost().getHostName()); out.flush(); return out; @@ -196,24 +151,17 @@ class NameNodeConnector implements DataEncryptionKeyFactory { } } - /** Close the connection. */ - void close() { - shouldRun = false; - try { - if (keyupdaterthread != null) { - keyupdaterthread.interrupt(); - } - } catch(Exception e) { - LOG.warn("Exception shutting down access key updater thread", e); - } + @Override + public void close() { + keyManager.close(); // close the output file IOUtils.closeStream(out); if (fs != null) { try { - fs.delete(BALANCER_ID_PATH, true); + fs.delete(idPath, true); } catch(IOException ioe) { - LOG.warn("Failed to delete " + BALANCER_ID_PATH, ioe); + LOG.warn("Failed to delete " + idPath, ioe); } } } @@ -221,31 +169,6 @@ class NameNodeConnector implements DataEncryptionKeyFactory { @Override public String toString() { return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri - + ", id=" + blockpoolID - + "]"; - } - - /** - * Periodically updates access keys. - */ - class BlockKeyUpdater implements Runnable { - @Override - public void run() { - try { - while (shouldRun) { - try { - blockTokenSecretManager.addKeys(namenode.getBlockKeys()); - } catch (IOException e) { - LOG.error("Failed to set keys", e); - } - Thread.sleep(keyUpdaterInterval); - } - } catch (InterruptedException e) { - LOG.debug("InterruptedException in block key updater thread", e); - } catch (Throwable e) { - LOG.error("Exception in block key updater thread", e); - shouldRun = false; - } - } + + ", bpid=" + blockpoolID + "]"; } }