HDFS-15665. Balancer logging improvements. Contributed by Konstantin V Shvachko.
(cherry picked from commit d07dc7afb4
)
This commit is contained in:
parent
ac82334041
commit
a9bf374380
|
@ -34,8 +34,6 @@ import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -64,7 +62,8 @@ import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.util.Tool;
|
import org.apache.hadoop.util.Tool;
|
||||||
import org.apache.hadoop.util.ToolRunner;
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
import static com.google.common.base.Preconditions.checkArgument;
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
|
@ -171,7 +170,7 @@ import com.google.common.base.Preconditions;
|
||||||
|
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class Balancer {
|
public class Balancer {
|
||||||
static final Log LOG = LogFactory.getLog(Balancer.class);
|
static final Logger LOG = LoggerFactory.getLogger(Balancer.class);
|
||||||
|
|
||||||
static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
|
static final Path BALANCER_ID_PATH = new Path("/system/balancer.id");
|
||||||
|
|
||||||
|
@ -261,6 +260,9 @@ public class Balancer {
|
||||||
*/
|
*/
|
||||||
Balancer(NameNodeConnector theblockpool, BalancerParameters p,
|
Balancer(NameNodeConnector theblockpool, BalancerParameters p,
|
||||||
Configuration conf) {
|
Configuration conf) {
|
||||||
|
// NameNode configuration parameters for balancing
|
||||||
|
getInt(conf, DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY,
|
||||||
|
DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT);
|
||||||
final long movedWinWidth = getLong(conf,
|
final long movedWinWidth = getLong(conf,
|
||||||
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
|
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY,
|
||||||
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
|
DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_DEFAULT);
|
||||||
|
@ -270,10 +272,6 @@ public class Balancer {
|
||||||
final int dispatcherThreads = getInt(conf,
|
final int dispatcherThreads = getInt(conf,
|
||||||
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
|
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY,
|
||||||
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
|
DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_DEFAULT);
|
||||||
final int maxConcurrentMovesPerNode = getInt(conf,
|
|
||||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
|
||||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
|
|
||||||
|
|
||||||
final long getBlocksSize = getLongBytes(conf,
|
final long getBlocksSize = getLongBytes(conf,
|
||||||
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_KEY,
|
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_KEY,
|
||||||
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT);
|
DFSConfigKeys.DFS_BALANCER_GETBLOCKS_SIZE_DEFAULT);
|
||||||
|
@ -290,6 +288,13 @@ public class Balancer {
|
||||||
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY,
|
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY,
|
||||||
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT);
|
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT);
|
||||||
|
|
||||||
|
// DataNode configuration parameters for balancing
|
||||||
|
final int maxConcurrentMovesPerNode = getInt(conf,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
|
||||||
|
getLongBytes(conf, DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
|
||||||
|
DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_DEFAULT);
|
||||||
|
|
||||||
this.nnc = theblockpool;
|
this.nnc = theblockpool;
|
||||||
this.dispatcher =
|
this.dispatcher =
|
||||||
new Dispatcher(theblockpool, p.getIncludedNodes(),
|
new Dispatcher(theblockpool, p.getIncludedNodes(),
|
||||||
|
@ -582,12 +587,13 @@ public class Balancer {
|
||||||
this.bytesAlreadyMoved = bytesAlreadyMoved;
|
this.bytesAlreadyMoved = bytesAlreadyMoved;
|
||||||
}
|
}
|
||||||
|
|
||||||
void print(int iteration, PrintStream out) {
|
void print(int iteration, NameNodeConnector nnc, PrintStream out) {
|
||||||
out.printf("%-24s %10d %19s %18s %17s%n",
|
out.printf("%-24s %10d %19s %18s %17s %s%n",
|
||||||
DateFormat.getDateTimeInstance().format(new Date()), iteration,
|
DateFormat.getDateTimeInstance().format(new Date()), iteration,
|
||||||
StringUtils.byteDesc(bytesAlreadyMoved),
|
StringUtils.byteDesc(bytesAlreadyMoved),
|
||||||
StringUtils.byteDesc(bytesLeftToMove),
|
StringUtils.byteDesc(bytesLeftToMove),
|
||||||
StringUtils.byteDesc(bytesBeingMoved));
|
StringUtils.byteDesc(bytesBeingMoved),
|
||||||
|
nnc.getNameNodeUri());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -633,8 +639,10 @@ public class Balancer {
|
||||||
System.out.println("No block can be moved. Exiting...");
|
System.out.println("No block can be moved. Exiting...");
|
||||||
return newResult(ExitStatus.NO_MOVE_BLOCK, bytesLeftToMove, bytesBeingMoved);
|
return newResult(ExitStatus.NO_MOVE_BLOCK, bytesLeftToMove, bytesBeingMoved);
|
||||||
} else {
|
} else {
|
||||||
LOG.info( "Will move " + StringUtils.byteDesc(bytesBeingMoved) +
|
LOG.info("Will move {} in this iteration for {}",
|
||||||
" in this iteration");
|
StringUtils.byteDesc(bytesBeingMoved), nnc.toString());
|
||||||
|
LOG.info("Total target DataNodes in this iteration: {}",
|
||||||
|
dispatcher.moveTasksTotal());
|
||||||
}
|
}
|
||||||
|
|
||||||
/* For each pair of <source, target>, start a thread that repeatedly
|
/* For each pair of <source, target>, start a thread that repeatedly
|
||||||
|
@ -681,7 +689,9 @@ public class Balancer {
|
||||||
LOG.info("excluded nodes = " + p.getExcludedNodes());
|
LOG.info("excluded nodes = " + p.getExcludedNodes());
|
||||||
LOG.info("source nodes = " + p.getSourceNodes());
|
LOG.info("source nodes = " + p.getSourceNodes());
|
||||||
checkKeytabAndInit(conf);
|
checkKeytabAndInit(conf);
|
||||||
System.out.println("Time Stamp Iteration# Bytes Already Moved Bytes Left To Move Bytes Being Moved");
|
System.out.println("Time Stamp Iteration#"
|
||||||
|
+ " Bytes Already Moved Bytes Left To Move Bytes Being Moved"
|
||||||
|
+ " NameNode");
|
||||||
|
|
||||||
List<NameNodeConnector> connectors = Collections.emptyList();
|
List<NameNodeConnector> connectors = Collections.emptyList();
|
||||||
try {
|
try {
|
||||||
|
@ -698,7 +708,7 @@ public class Balancer {
|
||||||
|| p.getBlockPools().contains(nnc.getBlockpoolID())) {
|
|| p.getBlockPools().contains(nnc.getBlockpoolID())) {
|
||||||
final Balancer b = new Balancer(nnc, p, conf);
|
final Balancer b = new Balancer(nnc, p, conf);
|
||||||
final Result r = b.runOneIteration();
|
final Result r = b.runOneIteration();
|
||||||
r.print(iteration, System.out);
|
r.print(iteration, nnc, System.out);
|
||||||
|
|
||||||
// clean all lists
|
// clean all lists
|
||||||
b.resetData(conf);
|
b.resetData(conf);
|
||||||
|
@ -718,7 +728,7 @@ public class Balancer {
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
for(NameNodeConnector nnc : connectors) {
|
for(NameNodeConnector nnc : connectors) {
|
||||||
IOUtils.cleanup(LOG, nnc);
|
IOUtils.cleanupWithLogger(LOG, nnc);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ExitStatus.SUCCESS.getExitCode();
|
return ExitStatus.SUCCESS.getExitCode();
|
||||||
|
|
|
@ -42,11 +42,8 @@ import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
|
||||||
import org.apache.hadoop.fs.StorageType;
|
import org.apache.hadoop.fs.StorageType;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
@ -77,6 +74,8 @@ import org.apache.hadoop.net.NetworkTopology;
|
||||||
import org.apache.hadoop.security.token.Token;
|
import org.apache.hadoop.security.token.Token;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
@ -84,7 +83,7 @@ import com.google.common.base.Preconditions;
|
||||||
/** Dispatching block replica moves between datanodes. */
|
/** Dispatching block replica moves between datanodes. */
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class Dispatcher {
|
public class Dispatcher {
|
||||||
static final Log LOG = LogFactory.getLog(Dispatcher.class);
|
static final Logger LOG = LoggerFactory.getLogger(Dispatcher.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* the period of time to delay the usage of a DataNode after hitting
|
* the period of time to delay the usage of a DataNode after hitting
|
||||||
|
@ -378,7 +377,7 @@ public class Dispatcher {
|
||||||
|
|
||||||
sendRequest(out, eb, accessToken);
|
sendRequest(out, eb, accessToken);
|
||||||
receiveResponse(in);
|
receiveResponse(in);
|
||||||
nnc.getBytesMoved().addAndGet(block.getNumBytes());
|
nnc.addBytesMoved(block.getNumBytes());
|
||||||
target.getDDatanode().setHasSuccess();
|
target.getDDatanode().setHasSuccess();
|
||||||
LOG.info("Successfully moved " + this);
|
LOG.info("Successfully moved " + this);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
|
@ -967,6 +966,10 @@ public class Dispatcher {
|
||||||
return nnc.getBytesMoved().get();
|
return nnc.getBytesMoved().get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
long getBblocksMoved() {
|
||||||
|
return nnc.getBlocksMoved().get();
|
||||||
|
}
|
||||||
|
|
||||||
long bytesToMove() {
|
long bytesToMove() {
|
||||||
Preconditions.checkState(
|
Preconditions.checkState(
|
||||||
storageGroupMap.size() >= sources.size() + targets.size(),
|
storageGroupMap.size() >= sources.size() + targets.size(),
|
||||||
|
@ -986,6 +989,14 @@ public class Dispatcher {
|
||||||
targets.add(target);
|
targets.add(target);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int moveTasksTotal() {
|
||||||
|
int b = 0;
|
||||||
|
for (Source src : sources) {
|
||||||
|
b += src.tasks.size();
|
||||||
|
}
|
||||||
|
return b;
|
||||||
|
}
|
||||||
|
|
||||||
private boolean shouldIgnore(DatanodeInfo dn) {
|
private boolean shouldIgnore(DatanodeInfo dn) {
|
||||||
// ignore out-of-service nodes
|
// ignore out-of-service nodes
|
||||||
final boolean outOfService = !dn.isInService();
|
final boolean outOfService = !dn.isInService();
|
||||||
|
@ -1067,12 +1078,13 @@ public class Dispatcher {
|
||||||
*/
|
*/
|
||||||
private long dispatchBlockMoves() throws InterruptedException {
|
private long dispatchBlockMoves() throws InterruptedException {
|
||||||
final long bytesLastMoved = getBytesMoved();
|
final long bytesLastMoved = getBytesMoved();
|
||||||
|
final long blocksLastMoved = getBblocksMoved();
|
||||||
final Future<?>[] futures = new Future<?>[sources.size()];
|
final Future<?>[] futures = new Future<?>[sources.size()];
|
||||||
|
|
||||||
int concurrentThreads = Math.min(sources.size(),
|
int concurrentThreads = Math.min(sources.size(),
|
||||||
((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize());
|
((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize());
|
||||||
assert concurrentThreads > 0 : "Number of concurrent threads is 0.";
|
assert concurrentThreads > 0 : "Number of concurrent threads is 0.";
|
||||||
LOG.debug("Balancer concurrent dispatcher threads " + concurrentThreads);
|
LOG.info("Balancer concurrent dispatcher threads = {}", concurrentThreads);
|
||||||
|
|
||||||
// Determine the size of each mover thread pool per target
|
// Determine the size of each mover thread pool per target
|
||||||
int threadsPerTarget = maxMoverThreads/targets.size();
|
int threadsPerTarget = maxMoverThreads/targets.size();
|
||||||
|
@ -1114,6 +1126,9 @@ public class Dispatcher {
|
||||||
|
|
||||||
// wait for all block moving to be done
|
// wait for all block moving to be done
|
||||||
waitForMoveCompletion(targets);
|
waitForMoveCompletion(targets);
|
||||||
|
LOG.info("Total bytes (blocks) moved in this iteration {} ({})",
|
||||||
|
StringUtils.byteDesc(getBytesMoved() - bytesLastMoved),
|
||||||
|
(getBblocksMoved() - blocksLastMoved));
|
||||||
|
|
||||||
return getBytesMoved() - bytesLastMoved;
|
return getBytesMoved() - bytesLastMoved;
|
||||||
}
|
}
|
||||||
|
|
|
@ -112,6 +112,7 @@ public class NameNodeConnector implements Closeable {
|
||||||
private final OutputStream out;
|
private final OutputStream out;
|
||||||
private final List<Path> targetPaths;
|
private final List<Path> targetPaths;
|
||||||
private final AtomicLong bytesMoved = new AtomicLong();
|
private final AtomicLong bytesMoved = new AtomicLong();
|
||||||
|
private final AtomicLong blocksMoved = new AtomicLong();
|
||||||
|
|
||||||
private final int maxNotChangedIterations;
|
private final int maxNotChangedIterations;
|
||||||
private int notChangedIterations = 0;
|
private int notChangedIterations = 0;
|
||||||
|
@ -168,6 +169,19 @@ public class NameNodeConnector implements Closeable {
|
||||||
return bytesMoved;
|
return bytesMoved;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
AtomicLong getBlocksMoved() {
|
||||||
|
return blocksMoved;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void addBytesMoved(long numBytes) {
|
||||||
|
bytesMoved.addAndGet(numBytes);
|
||||||
|
blocksMoved.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
public URI getNameNodeUri() {
|
||||||
|
return nameNodeUri;
|
||||||
|
}
|
||||||
|
|
||||||
/** @return blocks with locations. */
|
/** @return blocks with locations. */
|
||||||
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
|
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
|
@ -972,7 +972,7 @@ public class TestBalancer {
|
||||||
for(NameNodeConnector nnc : connectors) {
|
for(NameNodeConnector nnc : connectors) {
|
||||||
final Balancer b = new Balancer(nnc, p, conf);
|
final Balancer b = new Balancer(nnc, p, conf);
|
||||||
final Result r = b.runOneIteration();
|
final Result r = b.runOneIteration();
|
||||||
r.print(iteration, System.out);
|
r.print(iteration, nnc, System.out);
|
||||||
|
|
||||||
// clean all lists
|
// clean all lists
|
||||||
b.resetData(conf);
|
b.resetData(conf);
|
||||||
|
|
|
@ -30,7 +30,7 @@ import java.util.Random;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.slf4j.Logger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
|
@ -50,7 +50,7 @@ import org.apache.hadoop.hdfs.server.balancer.BalancerParameters;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.slf4j.event.Level;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
@ -58,10 +58,10 @@ import org.junit.Test;
|
||||||
* Test balancer with multiple NameNodes
|
* Test balancer with multiple NameNodes
|
||||||
*/
|
*/
|
||||||
public class TestBalancerWithMultipleNameNodes {
|
public class TestBalancerWithMultipleNameNodes {
|
||||||
static final Log LOG = Balancer.LOG;
|
static final Logger LOG = Balancer.LOG;
|
||||||
{
|
{
|
||||||
GenericTestUtils.setLogLevel(LOG, Level.ALL);
|
GenericTestUtils.setLogLevel(LOG, Level.TRACE);
|
||||||
DFSTestUtil.setNameNodeLogLevel(Level.ALL);
|
DFSTestUtil.setNameNodeLogLevel(org.apache.log4j.Level.TRACE);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@ -309,7 +309,7 @@ public class TestBalancerWithMultipleNameNodes {
|
||||||
try {
|
try {
|
||||||
Thread.sleep(ms);
|
Thread.sleep(ms);
|
||||||
} catch(InterruptedException e) {
|
} catch(InterruptedException e) {
|
||||||
LOG.error(e);
|
LOG.error("{}", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue