svn merge -c 1616422 from trunk for HDFS-6809. Move Balancer's inner classes MovedBlocks and Matcher as to standalone classes and separates KeyManager from NameNodeConnector.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1616423 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2014-08-07 07:23:06 +00:00
parent f55c04a94b
commit 645ca2d8f5
6 changed files with 459 additions and 290 deletions

View File

@ -102,6 +102,10 @@ Release 2.6.0 - UNRELEASED
HDFS-6787. Remove duplicate code in FSDirectory#unprotectedConcat. (Yi Liu via umamahesh) HDFS-6787. Remove duplicate code in FSDirectory#unprotectedConcat. (Yi Liu via umamahesh)
HDFS-6809. Move Balancer's inner classes MovedBlocks and Matcher as to
standalone classes and separates KeyManager from NameNodeConnector.
(szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-6690. Deduplicate xattr names in memory. (wang) HDFS-6690. Deduplicate xattr names in memory. (wang)

View File

@ -58,6 +58,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured; import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -85,7 +86,6 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.HostsFileReader;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
@ -196,10 +196,12 @@ import com.google.common.base.Preconditions;
@InterfaceAudience.Private @InterfaceAudience.Private
public class Balancer { public class Balancer {
static final Log LOG = LogFactory.getLog(Balancer.class); static final Log LOG = LogFactory.getLog(Balancer.class);
final private static long GB = 1L << 30; //1GB
final private static long MAX_SIZE_TO_MOVE = 10*GB; private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
final private static long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB;
private static long WIN_WIDTH = 5400*1000L; // 1.5 hour private static final long GB = 1L << 30; //1GB
private static final long MAX_SIZE_TO_MOVE = 10*GB;
private static final long MAX_BLOCKS_SIZE_TO_FETCH = 2*GB;
/** The maximum number of concurrent blocks moves for /** The maximum number of concurrent blocks moves for
* balancing purpose at a datanode * balancing purpose at a datanode
@ -220,6 +222,8 @@ public class Balancer {
+ "\tIncludes only the specified datanodes."; + "\tIncludes only the specified datanodes.";
private final NameNodeConnector nnc; private final NameNodeConnector nnc;
private final KeyManager keyManager;
private final BalancingPolicy policy; private final BalancingPolicy policy;
private final SaslDataTransferClient saslClient; private final SaslDataTransferClient saslClient;
private final double threshold; private final double threshold;
@ -242,7 +246,8 @@ public class Balancer {
private final Map<Block, BalancerBlock> globalBlockList private final Map<Block, BalancerBlock> globalBlockList
= new HashMap<Block, BalancerBlock>(); = new HashMap<Block, BalancerBlock>();
private final MovedBlocks movedBlocks = new MovedBlocks(); private final MovedBlocks<BalancerDatanode.StorageGroup> movedBlocks;
/** Map (datanodeUuid,storageType -> StorageGroup) */ /** Map (datanodeUuid,storageType -> StorageGroup) */
private final StorageGroupMap storageGroupMap = new StorageGroupMap(); private final StorageGroupMap storageGroupMap = new StorageGroupMap();
@ -327,7 +332,7 @@ public class Balancer {
if (isGoodBlockCandidate(source, target, block)) { if (isGoodBlockCandidate(source, target, block)) {
this.block = block; this.block = block;
if ( chooseProxySource() ) { if ( chooseProxySource() ) {
movedBlocks.add(block); movedBlocks.put(block);
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("Decided to move " + this); LOG.debug("Decided to move " + this);
} }
@ -400,10 +405,10 @@ public class Balancer {
OutputStream unbufOut = sock.getOutputStream(); OutputStream unbufOut = sock.getOutputStream();
InputStream unbufIn = sock.getInputStream(); InputStream unbufIn = sock.getInputStream();
ExtendedBlock eb = new ExtendedBlock(nnc.blockpoolID, block.getBlock()); ExtendedBlock eb = new ExtendedBlock(nnc.getBlockpoolID(), block.getBlock());
Token<BlockTokenIdentifier> accessToken = nnc.getAccessToken(eb); Token<BlockTokenIdentifier> accessToken = keyManager.getAccessToken(eb);
IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut, IOStreamPair saslStreams = saslClient.socketSend(sock, unbufOut,
unbufIn, nnc, accessToken, target.getDatanode()); unbufIn, keyManager, accessToken, target.getDatanode());
unbufOut = saslStreams.out; unbufOut = saslStreams.out;
unbufIn = saslStreams.in; unbufIn = saslStreams.in;
out = new DataOutputStream(new BufferedOutputStream(unbufOut, out = new DataOutputStream(new BufferedOutputStream(unbufOut,
@ -484,47 +489,9 @@ public class Balancer {
} }
/* A class for keeping track of blocks in the Balancer */ /* A class for keeping track of blocks in the Balancer */
static private class BalancerBlock { static class BalancerBlock extends MovedBlocks.Locations<BalancerDatanode.StorageGroup> {
private final Block block; // the block BalancerBlock(Block block) {
/** The locations of the replicas of the block. */ super(block);
private final List<BalancerDatanode.StorageGroup> locations
= new ArrayList<BalancerDatanode.StorageGroup>(3);
/* Constructor */
private BalancerBlock(Block block) {
this.block = block;
}
/* clean block locations */
private synchronized void clearLocations() {
locations.clear();
}
/* add a location */
private synchronized void addLocation(BalancerDatanode.StorageGroup g) {
if (!locations.contains(g)) {
locations.add(g);
}
}
/** @return if the block is located on the given storage group. */
private synchronized boolean isLocatedOn(BalancerDatanode.StorageGroup g) {
return locations.contains(g);
}
/* Return its locations */
private synchronized List<BalancerDatanode.StorageGroup> getLocations() {
return locations;
}
/* Return the block */
private Block getBlock() {
return block;
}
/* Return the length of the block */
private long getNumBytes() {
return block.getNumBytes();
} }
} }
@ -736,7 +703,7 @@ public class Balancer {
*/ */
private long getBlockList() throws IOException { private long getBlockList() throws IOException {
final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive); final long size = Math.min(MAX_BLOCKS_SIZE_TO_FETCH, blocksToReceive);
final BlockWithLocations[] newBlocks = nnc.namenode.getBlocks( final BlockWithLocations[] newBlocks = nnc.getNamenode().getBlocks(
getDatanode(), size).getBlocks(); getDatanode(), size).getBlocks();
long bytesReceived = 0; long bytesReceived = 0;
@ -820,7 +787,7 @@ public class Balancer {
private void filterMovedBlocks() { private void filterMovedBlocks() {
for (Iterator<BalancerBlock> blocks=getBlockIterator(); for (Iterator<BalancerBlock> blocks=getBlockIterator();
blocks.hasNext();) { blocks.hasNext();) {
if (movedBlocks.contains(blocks.next())) { if (movedBlocks.contains(blocks.next().getBlock())) {
blocks.remove(); blocks.remove();
} }
} }
@ -926,6 +893,13 @@ public class Balancer {
this.nodesToBeExcluded = p.nodesToBeExcluded; this.nodesToBeExcluded = p.nodesToBeExcluded;
this.nodesToBeIncluded = p.nodesToBeIncluded; this.nodesToBeIncluded = p.nodesToBeIncluded;
this.nnc = theblockpool; this.nnc = theblockpool;
this.keyManager = nnc.getKeyManager();
final long movedWinWidth = conf.getLong(
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
movedBlocks = new MovedBlocks<BalancerDatanode.StorageGroup>(movedWinWidth);
cluster = NetworkTopology.getInstance(conf); cluster = NetworkTopology.getInstance(conf);
this.moverExecutor = Executors.newFixedThreadPool( this.moverExecutor = Executors.newFixedThreadPool(
@ -1095,36 +1069,6 @@ public class Balancer {
LOG.info(items.size() + " " + name + ": " + items); LOG.info(items.size() + " " + name + ": " + items);
} }
/** A matcher interface for matching nodes. */
private interface Matcher {
/** Given the cluster topology, does the left node match the right node? */
boolean match(NetworkTopology cluster, Node left, Node right);
}
/** Match datanodes in the same node group. */
static final Matcher SAME_NODE_GROUP = new Matcher() {
@Override
public boolean match(NetworkTopology cluster, Node left, Node right) {
return cluster.isOnSameNodeGroup(left, right);
}
};
/** Match datanodes in the same rack. */
static final Matcher SAME_RACK = new Matcher() {
@Override
public boolean match(NetworkTopology cluster, Node left, Node right) {
return cluster.isOnSameRack(left, right);
}
};
/** Match any datanode with any other datanode. */
static final Matcher ANY_OTHER = new Matcher() {
@Override
public boolean match(NetworkTopology cluster, Node left, Node right) {
return left != right;
}
};
/** /**
* Decide all <source, target> pairs and * Decide all <source, target> pairs and
* the number of bytes to move from a source to a target * the number of bytes to move from a source to a target
@ -1135,13 +1079,13 @@ public class Balancer {
private long chooseStorageGroups() { private long chooseStorageGroups() {
// First, match nodes on the same node group if cluster is node group aware // First, match nodes on the same node group if cluster is node group aware
if (cluster.isNodeGroupAware()) { if (cluster.isNodeGroupAware()) {
chooseStorageGroups(SAME_NODE_GROUP); chooseStorageGroups(Matcher.SAME_NODE_GROUP);
} }
// Then, match nodes on the same rack // Then, match nodes on the same rack
chooseStorageGroups(SAME_RACK); chooseStorageGroups(Matcher.SAME_RACK);
// At last, match all remaining nodes // At last, match all remaining nodes
chooseStorageGroups(ANY_OTHER); chooseStorageGroups(Matcher.ANY_OTHER);
Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(), Preconditions.checkState(storageGroupMap.size() >= sources.size() + targets.size(),
"Mismatched number of datanodes (" + storageGroupMap.size() + " < " "Mismatched number of datanodes (" + storageGroupMap.size() + " < "
@ -1308,56 +1252,6 @@ public class Balancer {
} while (shouldWait); } while (shouldWait);
} }
/** This window makes sure to keep blocks that have been moved within 1.5 hour.
* Old window has blocks that are older;
* Current window has blocks that are more recent;
* Cleanup method triggers the check if blocks in the old window are
* more than 1.5 hour old. If yes, purge the old window and then
* move blocks in current window to old window.
*/
private static class MovedBlocks {
private long lastCleanupTime = Time.now();
final private static int CUR_WIN = 0;
final private static int OLD_WIN = 1;
final private static int NUM_WINS = 2;
final private List<HashMap<Block, BalancerBlock>> movedBlocks =
new ArrayList<HashMap<Block, BalancerBlock>>(NUM_WINS);
/* initialize the moved blocks collection */
private MovedBlocks() {
movedBlocks.add(new HashMap<Block,BalancerBlock>());
movedBlocks.add(new HashMap<Block,BalancerBlock>());
}
/* add a block thus marking a block to be moved */
synchronized private void add(BalancerBlock block) {
movedBlocks.get(CUR_WIN).put(block.getBlock(), block);
}
/* check if a block is marked as moved */
synchronized private boolean contains(BalancerBlock block) {
return contains(block.getBlock());
}
/* check if a block is marked as moved */
synchronized private boolean contains(Block block) {
return movedBlocks.get(CUR_WIN).containsKey(block) ||
movedBlocks.get(OLD_WIN).containsKey(block);
}
/* remove old blocks */
synchronized private void cleanup() {
long curTime = Time.now();
// check if old win is older than winWidth
if (lastCleanupTime + WIN_WIDTH <= curTime) {
// purge the old window
movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN));
movedBlocks.set(CUR_WIN, new HashMap<Block, BalancerBlock>());
lastCleanupTime = curTime;
}
}
}
/* Decide if it is OK to move the given block from source to target /* Decide if it is OK to move the given block from source to target
* A block is a good candidate if * A block is a good candidate if
* 1. the block is not in the process of being moved/has not been moved; * 1. the block is not in the process of being moved/has not been moved;
@ -1370,7 +1264,7 @@ public class Balancer {
return false; return false;
} }
// check if the block is moved or not // check if the block is moved or not
if (movedBlocks.contains(block)) { if (movedBlocks.contains(block.getBlock())) {
return false; return false;
} }
if (block.isLocatedOn(target)) { if (block.isLocatedOn(target)) {
@ -1388,7 +1282,7 @@ public class Balancer {
} else { } else {
boolean notOnSameRack = true; boolean notOnSameRack = true;
synchronized (block) { synchronized (block) {
for (BalancerDatanode.StorageGroup loc : block.locations) { for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) { if (cluster.isOnSameRack(loc.getDatanode(), target.getDatanode())) {
notOnSameRack = false; notOnSameRack = false;
break; break;
@ -1400,7 +1294,7 @@ public class Balancer {
goodBlock = true; goodBlock = true;
} else { } else {
// good if source is on the same rack as on of the replicas // good if source is on the same rack as on of the replicas
for (BalancerDatanode.StorageGroup loc : block.locations) { for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
if (loc != source && if (loc != source &&
cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) { cluster.isOnSameRack(loc.getDatanode(), source.getDatanode())) {
goodBlock = true; goodBlock = true;
@ -1426,7 +1320,7 @@ public class Balancer {
private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target, private boolean isOnSameNodeGroupWithReplicas(BalancerDatanode.StorageGroup target,
BalancerBlock block, Source source) { BalancerBlock block, Source source) {
final DatanodeInfo targetDn = target.getDatanode(); final DatanodeInfo targetDn = target.getDatanode();
for (BalancerDatanode.StorageGroup loc : block.locations) { for (BalancerDatanode.StorageGroup loc : block.getLocations()) {
if (loc != source && if (loc != source &&
cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) { cluster.isOnSameNodeGroup(loc.getDatanode(), targetDn)) {
return true; return true;
@ -1490,7 +1384,7 @@ public class Balancer {
* decide the number of bytes need to be moved * decide the number of bytes need to be moved
*/ */
final long bytesLeftToMove = init( final long bytesLeftToMove = init(
nnc.client.getDatanodeStorageReport(DatanodeReportType.LIVE)); nnc.getClient().getDatanodeStorageReport(DatanodeReportType.LIVE));
if (bytesLeftToMove == 0) { if (bytesLeftToMove == 0) {
System.out.println("The cluster is balanced. Exiting..."); System.out.println("The cluster is balanced. Exiting...");
return ReturnStatus.SUCCESS; return ReturnStatus.SUCCESS;
@ -1560,7 +1454,7 @@ public class Balancer {
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY, DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY,
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT); DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT);
LOG.info("namenodes = " + namenodes); LOG.info("namenodes = " + namenodes);
LOG.info("p = " + p); LOG.info("parameters = " + p);
final Formatter formatter = new Formatter(System.out); final Formatter formatter = new Formatter(System.out);
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved"); System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
@ -1569,7 +1463,10 @@ public class Balancer {
= new ArrayList<NameNodeConnector>(namenodes.size()); = new ArrayList<NameNodeConnector>(namenodes.size());
try { try {
for (URI uri : namenodes) { for (URI uri : namenodes) {
connectors.add(new NameNodeConnector(uri, conf)); final NameNodeConnector nnc = new NameNodeConnector(
Balancer.class.getSimpleName(), uri, BALANCER_ID_PATH, conf);
nnc.getKeyManager().startBlockKeyUpdater();
connectors.add(nnc);
} }
boolean done = false; boolean done = false;
@ -1731,9 +1628,6 @@ public class Balancer {
public int run(String[] args) { public int run(String[] args) {
final long startTime = Time.now(); final long startTime = Time.now();
final Configuration conf = getConf(); final Configuration conf = getConf();
WIN_WIDTH = conf.getLong(
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
try { try {
checkReplicationPolicyCompatibility(conf); checkReplicationPolicyCompatibility(conf);

View File

@ -0,0 +1,173 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
import java.io.Closeable;
import java.io.IOException;
import java.util.EnumSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager.AccessMode;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.StringUtils;
/**
* The class provides utilities for key and token management.
*/
@InterfaceAudience.Private
public class KeyManager implements Closeable, DataEncryptionKeyFactory {
private static final Log LOG = LogFactory.getLog(KeyManager.class);
private final NamenodeProtocol namenode;
private final boolean isBlockTokenEnabled;
private final boolean encryptDataTransfer;
private boolean shouldRun;
private final BlockTokenSecretManager blockTokenSecretManager;
private final BlockKeyUpdater blockKeyUpdater;
private DataEncryptionKey encryptionKey;
public KeyManager(String blockpoolID, NamenodeProtocol namenode,
boolean encryptDataTransfer, Configuration conf) throws IOException {
this.namenode = namenode;
this.encryptDataTransfer = encryptDataTransfer;
final ExportedBlockKeys keys = namenode.getBlockKeys();
this.isBlockTokenEnabled = keys.isBlockTokenEnabled();
if (isBlockTokenEnabled) {
long updateInterval = keys.getKeyUpdateInterval();
long tokenLifetime = keys.getTokenLifetime();
LOG.info("Block token params received from NN: update interval="
+ StringUtils.formatTime(updateInterval)
+ ", token lifetime=" + StringUtils.formatTime(tokenLifetime));
String encryptionAlgorithm = conf.get(
DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
this.blockTokenSecretManager = new BlockTokenSecretManager(
updateInterval, tokenLifetime, blockpoolID, encryptionAlgorithm);
this.blockTokenSecretManager.addKeys(keys);
// sync block keys with NN more frequently than NN updates its block keys
this.blockKeyUpdater = new BlockKeyUpdater(updateInterval / 4);
this.shouldRun = true;
} else {
this.blockTokenSecretManager = null;
this.blockKeyUpdater = null;
}
}
public void startBlockKeyUpdater() {
if (blockKeyUpdater != null) {
blockKeyUpdater.daemon.start();
}
}
/** Get an access token for a block. */
public Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
) throws IOException {
if (!isBlockTokenEnabled) {
return BlockTokenSecretManager.DUMMY_TOKEN;
} else {
if (!shouldRun) {
throw new IOException(
"Cannot get access token since BlockKeyUpdater is not running");
}
return blockTokenSecretManager.generateToken(null, eb,
EnumSet.of(AccessMode.REPLACE, AccessMode.COPY));
}
}
@Override
public DataEncryptionKey newDataEncryptionKey() {
if (encryptDataTransfer) {
synchronized (this) {
if (encryptionKey == null) {
encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
}
return encryptionKey;
}
} else {
return null;
}
}
@Override
public void close() {
shouldRun = false;
try {
if (blockKeyUpdater != null) {
blockKeyUpdater.daemon.interrupt();
}
} catch(Exception e) {
LOG.warn("Exception shutting down access key updater thread", e);
}
}
/**
* Periodically updates access keys.
*/
class BlockKeyUpdater implements Runnable, Closeable {
private final Daemon daemon = new Daemon(this);
private final long sleepInterval;
BlockKeyUpdater(final long sleepInterval) {
this.sleepInterval = sleepInterval;
LOG.info("Update block keys every " + StringUtils.formatTime(sleepInterval));
}
@Override
public void run() {
try {
while (shouldRun) {
try {
blockTokenSecretManager.addKeys(namenode.getBlockKeys());
} catch (IOException e) {
LOG.error("Failed to set keys", e);
}
Thread.sleep(sleepInterval);
}
} catch (InterruptedException e) {
LOG.debug("InterruptedException in block key updater thread", e);
} catch (Throwable e) {
LOG.error("Exception in block key updater thread", e);
shouldRun = false;
}
}
@Override
public void close() throws IOException {
try {
daemon.interrupt();
} catch(Exception e) {
LOG.warn("Exception shutting down key updater thread", e);
}
}
}
}

View File

@ -0,0 +1,51 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
/** A matcher interface for matching nodes. */
public interface Matcher {
/** Given the cluster topology, does the left node match the right node? */
public boolean match(NetworkTopology cluster, Node left, Node right);
/** Match datanodes in the same node group. */
public static final Matcher SAME_NODE_GROUP = new Matcher() {
@Override
public boolean match(NetworkTopology cluster, Node left, Node right) {
return cluster.isOnSameNodeGroup(left, right);
}
};
/** Match datanodes in the same rack. */
public static final Matcher SAME_RACK = new Matcher() {
@Override
public boolean match(NetworkTopology cluster, Node left, Node right) {
return cluster.isOnSameRack(left, right);
}
};
/** Match any datanode with any other datanode. */
public static final Matcher ANY_OTHER = new Matcher() {
@Override
public boolean match(NetworkTopology cluster, Node left, Node right) {
return left != right;
}
};
}

View File

@ -0,0 +1,124 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.balancer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.util.Time;
/**
* This window makes sure to keep blocks that have been moved within a fixed
* time interval (default is 1.5 hour). Old window has blocks that are older;
* Current window has blocks that are more recent; Cleanup method triggers the
* check if blocks in the old window are more than the fixed time interval. If
* yes, purge the old window and then move blocks in current window to old
* window.
*
* @param <L> Location type
*/
public class MovedBlocks<L> {
/** A class for keeping track of a block and its locations */
public static class Locations<L> {
private final Block block; // the block
/** The locations of the replicas of the block. */
private final List<L> locations = new ArrayList<L>(3);
public Locations(Block block) {
this.block = block;
}
/** clean block locations */
public synchronized void clearLocations() {
locations.clear();
}
/** add a location */
public synchronized void addLocation(L loc) {
if (!locations.contains(loc)) {
locations.add(loc);
}
}
/** @return if the block is located on the given location. */
public synchronized boolean isLocatedOn(L loc) {
return locations.contains(loc);
}
/** @return its locations */
public synchronized List<L> getLocations() {
return locations;
}
/* @return the block */
public Block getBlock() {
return block;
}
/* Return the length of the block */
public long getNumBytes() {
return block.getNumBytes();
}
}
private static final int CUR_WIN = 0;
private static final int OLD_WIN = 1;
private static final int NUM_WINS = 2;
private final long winTimeInterval;
private long lastCleanupTime = Time.monotonicNow();
private final List<Map<Block, Locations<L>>> movedBlocks
= new ArrayList<Map<Block, Locations<L>>>(NUM_WINS);
/** initialize the moved blocks collection */
public MovedBlocks(long winTimeInterval) {
this.winTimeInterval = winTimeInterval;
movedBlocks.add(newMap());
movedBlocks.add(newMap());
}
private Map<Block, Locations<L>> newMap() {
return new HashMap<Block, Locations<L>>();
}
/** add a block thus marking a block to be moved */
public synchronized void put(Locations<L> block) {
movedBlocks.get(CUR_WIN).put(block.getBlock(), block);
}
/** @return if a block is marked as moved */
public synchronized boolean contains(Block block) {
return movedBlocks.get(CUR_WIN).containsKey(block) ||
movedBlocks.get(OLD_WIN).containsKey(block);
}
/** remove old blocks */
public synchronized void cleanup() {
long curTime = Time.monotonicNow();
// check if old win is older than winWidth
if (lastCleanupTime + winTimeInterval <= curTime) {
// purge the old window
movedBlocks.set(OLD_WIN, movedBlocks.get(CUR_WIN));
movedBlocks.set(CUR_WIN, newMap());
lastCleanupTime = curTime;
}
}
}

View File

@ -17,113 +17,96 @@
*/ */
package org.apache.hadoop.hdfs.server.balancer; package org.apache.hadoop.hdfs.server.balancer;
import java.io.Closeable;
import java.io.DataOutputStream; import java.io.DataOutputStream;
import java.io.IOException; import java.io.IOException;
import java.io.OutputStream; import java.io.OutputStream;
import java.net.InetAddress; import java.net.InetAddress;
import java.net.URI; import java.net.URI;
import java.util.EnumSet;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException; import org.apache.hadoop.hdfs.protocol.AlreadyBeingCreatedException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol; import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.datatransfer.sasl.DataEncryptionKeyFactory;
import org.apache.hadoop.hdfs.security.token.block.DataEncryptionKey;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
import org.apache.hadoop.hdfs.security.token.block.BlockTokenSecretManager;
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.Daemon;
/** /**
* The class provides utilities for {@link Balancer} to access a NameNode * The class provides utilities for accessing a NameNode.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class NameNodeConnector implements DataEncryptionKeyFactory { public class NameNodeConnector implements Closeable {
private static final Log LOG = Balancer.LOG; private static final Log LOG = LogFactory.getLog(NameNodeConnector.class);
private static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
private static final int MAX_NOT_CHANGED_ITERATIONS = 5; private static final int MAX_NOT_CHANGED_ITERATIONS = 5;
final URI nameNodeUri; private final URI nameNodeUri;
final String blockpoolID; private final String blockpoolID;
final NamenodeProtocol namenode; private final NamenodeProtocol namenode;
final ClientProtocol client; private final ClientProtocol client;
final FileSystem fs; private final KeyManager keyManager;
final OutputStream out;
private final FileSystem fs;
private final Path idPath;
private final OutputStream out;
private final boolean isBlockTokenEnabled;
private final boolean encryptDataTransfer;
private boolean shouldRun;
private long keyUpdaterInterval;
// used for balancer
private int notChangedIterations = 0; private int notChangedIterations = 0;
private BlockTokenSecretManager blockTokenSecretManager;
private Daemon keyupdaterthread; // AccessKeyUpdater thread
private DataEncryptionKey encryptionKey;
NameNodeConnector(URI nameNodeUri, public NameNodeConnector(String name, URI nameNodeUri, Path idPath,
Configuration conf) throws IOException { Configuration conf) throws IOException {
this.nameNodeUri = nameNodeUri; this.nameNodeUri = nameNodeUri;
this.idPath = idPath;
this.namenode = this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri,
NameNodeProxies.createProxy(conf, nameNodeUri, NamenodeProtocol.class) NamenodeProtocol.class).getProxy();
.getProxy(); this.client = NameNodeProxies.createProxy(conf, nameNodeUri,
this.client = ClientProtocol.class).getProxy();
NameNodeProxies.createProxy(conf, nameNodeUri, ClientProtocol.class)
.getProxy();
this.fs = FileSystem.get(nameNodeUri, conf); this.fs = FileSystem.get(nameNodeUri, conf);
final NamespaceInfo namespaceinfo = namenode.versionRequest(); final NamespaceInfo namespaceinfo = namenode.versionRequest();
this.blockpoolID = namespaceinfo.getBlockPoolID(); this.blockpoolID = namespaceinfo.getBlockPoolID();
final ExportedBlockKeys keys = namenode.getBlockKeys(); final FsServerDefaults defaults = fs.getServerDefaults(new Path("/"));
this.isBlockTokenEnabled = keys.isBlockTokenEnabled(); this.keyManager = new KeyManager(blockpoolID, namenode,
if (isBlockTokenEnabled) { defaults.getEncryptDataTransfer(), conf);
long blockKeyUpdateInterval = keys.getKeyUpdateInterval();
long blockTokenLifetime = keys.getTokenLifetime();
LOG.info("Block token params received from NN: keyUpdateInterval="
+ blockKeyUpdateInterval / (60 * 1000) + " min(s), tokenLifetime="
+ blockTokenLifetime / (60 * 1000) + " min(s)");
String encryptionAlgorithm = conf.get(
DFSConfigKeys.DFS_DATA_ENCRYPTION_ALGORITHM_KEY);
this.blockTokenSecretManager = new BlockTokenSecretManager(
blockKeyUpdateInterval, blockTokenLifetime, blockpoolID,
encryptionAlgorithm);
this.blockTokenSecretManager.addKeys(keys);
/*
* Balancer should sync its block keys with NN more frequently than NN
* updates its block keys
*/
this.keyUpdaterInterval = blockKeyUpdateInterval / 4;
LOG.info("Balancer will update its block keys every "
+ keyUpdaterInterval / (60 * 1000) + " minute(s)");
this.keyupdaterthread = new Daemon(new BlockKeyUpdater());
this.shouldRun = true;
this.keyupdaterthread.start();
}
this.encryptDataTransfer = fs.getServerDefaults(new Path("/"))
.getEncryptDataTransfer();
// Check if there is another balancer running.
// Exit if there is another one running. // Exit if there is another one running.
out = checkAndMarkRunningBalancer(); out = checkAndMarkRunning();
if (out == null) { if (out == null) {
throw new IOException("Another balancer is running"); throw new IOException("Another " + name + " is running.");
} }
} }
boolean shouldContinue(long dispatchBlockMoveBytes) { /** @return the block pool ID */
public String getBlockpoolID() {
return blockpoolID;
}
/** @return the namenode proxy. */
public NamenodeProtocol getNamenode() {
return namenode;
}
/** @return the client proxy. */
public ClientProtocol getClient() {
return client;
}
/** @return the key manager */
public KeyManager getKeyManager() {
return keyManager;
}
/** Should the instance continue running? */
public boolean shouldContinue(long dispatchBlockMoveBytes) {
if (dispatchBlockMoveBytes > 0) { if (dispatchBlockMoveBytes > 0) {
notChangedIterations = 0; notChangedIterations = 0;
} else { } else {
@ -137,53 +120,25 @@ class NameNodeConnector implements DataEncryptionKeyFactory {
return true; return true;
} }
/** Get an access token for a block. */
Token<BlockTokenIdentifier> getAccessToken(ExtendedBlock eb
) throws IOException {
if (!isBlockTokenEnabled) {
return BlockTokenSecretManager.DUMMY_TOKEN;
} else {
if (!shouldRun) {
throw new IOException(
"Can not get access token. BlockKeyUpdater is not running");
}
return blockTokenSecretManager.generateToken(null, eb,
EnumSet.of(BlockTokenSecretManager.AccessMode.REPLACE,
BlockTokenSecretManager.AccessMode.COPY));
}
}
@Override /**
public DataEncryptionKey newDataEncryptionKey() { * The idea for making sure that there is no more than one instance
if (encryptDataTransfer) {
synchronized (this) {
if (encryptionKey == null) {
encryptionKey = blockTokenSecretManager.generateDataEncryptionKey();
}
return encryptionKey;
}
} else {
return null;
}
}
/* The idea for making sure that there is no more than one balancer
* running in an HDFS is to create a file in the HDFS, writes the hostname * running in an HDFS is to create a file in the HDFS, writes the hostname
* of the machine on which the balancer is running to the file, but did not * of the machine on which the instance is running to the file, but did not
* close the file until the balancer exits. * close the file until it exits.
* This prevents the second balancer from running because it can not *
* This prevents the second instance from running because it can not
* creates the file while the first one is running. * creates the file while the first one is running.
* *
* This method checks if there is any running balancer and * This method checks if there is any running instance. If no, mark yes.
* if no, mark yes if no.
* Note that this is an atomic operation. * Note that this is an atomic operation.
* *
* Return null if there is a running balancer; otherwise the output stream * @return null if there is a running instance;
* to the newly created file. * otherwise, the output stream to the newly created file.
*/ */
private OutputStream checkAndMarkRunningBalancer() throws IOException { private OutputStream checkAndMarkRunning() throws IOException {
try { try {
final DataOutputStream out = fs.create(BALANCER_ID_PATH); final DataOutputStream out = fs.create(idPath);
out.writeBytes(InetAddress.getLocalHost().getHostName()); out.writeBytes(InetAddress.getLocalHost().getHostName());
out.flush(); out.flush();
return out; return out;
@ -196,24 +151,17 @@ class NameNodeConnector implements DataEncryptionKeyFactory {
} }
} }
/** Close the connection. */ @Override
void close() { public void close() {
shouldRun = false; keyManager.close();
try {
if (keyupdaterthread != null) {
keyupdaterthread.interrupt();
}
} catch(Exception e) {
LOG.warn("Exception shutting down access key updater thread", e);
}
// close the output file // close the output file
IOUtils.closeStream(out); IOUtils.closeStream(out);
if (fs != null) { if (fs != null) {
try { try {
fs.delete(BALANCER_ID_PATH, true); fs.delete(idPath, true);
} catch(IOException ioe) { } catch(IOException ioe) {
LOG.warn("Failed to delete " + BALANCER_ID_PATH, ioe); LOG.warn("Failed to delete " + idPath, ioe);
} }
} }
} }
@ -221,31 +169,6 @@ class NameNodeConnector implements DataEncryptionKeyFactory {
@Override @Override
public String toString() { public String toString() {
return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri return getClass().getSimpleName() + "[namenodeUri=" + nameNodeUri
+ ", id=" + blockpoolID + ", bpid=" + blockpoolID + "]";
+ "]";
}
/**
* Periodically updates access keys.
*/
class BlockKeyUpdater implements Runnable {
@Override
public void run() {
try {
while (shouldRun) {
try {
blockTokenSecretManager.addKeys(namenode.getBlockKeys());
} catch (IOException e) {
LOG.error("Failed to set keys", e);
}
Thread.sleep(keyUpdaterInterval);
}
} catch (InterruptedException e) {
LOG.debug("InterruptedException in block key updater thread", e);
} catch (Throwable e) {
LOG.error("Exception in block key updater thread", e);
shouldRun = false;
}
}
} }
} }