+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.commons.logging.Log;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Timer;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * This is a debugging class that can be used by callers to track
+ * whether a specifc lock is being held for too long and periodically
+ * log a warning and stack trace, if so.
+ *
+ * The logged warnings are throttled so that logs are not spammed.
+ *
+ * A new instance of InstrumentedLock can be created for each object
+ * that needs to be instrumented.
+ */
+@InterfaceAudience.Private
+@InterfaceStability.Unstable
+public class InstrumentedLock implements Lock {
+
+ private final Lock lock;
+ private final Log logger;
+ private final String name;
+ private final Timer clock;
+
+ /** Minimum gap between two lock warnings. */
+ private final long minLoggingGap;
+ /** Threshold for detecting long lock held time. */
+ private final long lockWarningThreshold;
+
+ // Tracking counters for lock statistics.
+ private volatile long lockAcquireTimestamp;
+ private final AtomicLong lastLogTimestamp;
+ private final AtomicLong warningsSuppressed = new AtomicLong(0);
+
+ /**
+ * Create a instrumented lock instance which logs a warning message
+ * when lock held time is above given threshold.
+ *
+ * @param name the identifier of the lock object
+ * @param logger this class does not have its own logger, will log to the
+ * given logger instead
+ * @param minLoggingGapMs the minimum time gap between two log messages,
+ * this is to avoid spamming to many logs
+ * @param lockWarningThresholdMs the time threshold to view lock held
+ * time as being "too long"
+ */
+ public InstrumentedLock(String name, Log logger, long minLoggingGapMs,
+ long lockWarningThresholdMs) {
+ this(name, logger, new ReentrantLock(),
+ minLoggingGapMs, lockWarningThresholdMs);
+ }
+
+ public InstrumentedLock(String name, Log logger, Lock lock,
+ long minLoggingGapMs, long lockWarningThresholdMs) {
+ this(name, logger, lock,
+ minLoggingGapMs, lockWarningThresholdMs, new Timer());
+ }
+
+ @VisibleForTesting
+ InstrumentedLock(String name, Log logger, Lock lock,
+ long minLoggingGapMs, long lockWarningThresholdMs, Timer clock) {
+ this.name = name;
+ this.lock = lock;
+ this.clock = clock;
+ this.logger = logger;
+ minLoggingGap = minLoggingGapMs;
+ lockWarningThreshold = lockWarningThresholdMs;
+ lastLogTimestamp = new AtomicLong(
+ clock.monotonicNow() - Math.max(minLoggingGap, lockWarningThreshold));
+ }
+
+ @Override
+ public void lock() {
+ lock.lock();
+ lockAcquireTimestamp = clock.monotonicNow();
+ }
+
+ @Override
+ public void lockInterruptibly() throws InterruptedException {
+ lock.lockInterruptibly();
+ lockAcquireTimestamp = clock.monotonicNow();
+ }
+
+ @Override
+ public boolean tryLock() {
+ if (lock.tryLock()) {
+ lockAcquireTimestamp = clock.monotonicNow();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
+ if (lock.tryLock(time, unit)) {
+ lockAcquireTimestamp = clock.monotonicNow();
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public void unlock() {
+ long localLockReleaseTime = clock.monotonicNow();
+ long localLockAcquireTime = lockAcquireTimestamp;
+ lock.unlock();
+ check(localLockAcquireTime, localLockReleaseTime);
+ }
+
+ @Override
+ public Condition newCondition() {
+ return lock.newCondition();
+ }
+
+ @VisibleForTesting
+ void logWarning(long lockHeldTime, long suppressed) {
+ logger.warn(String.format("Lock held time above threshold: " +
+ "lock identifier: %s " +
+ "lockHeldTimeMs=%d ms. Suppressed %d lock warnings. " +
+ "The stack trace is: %s" ,
+ name, lockHeldTime, suppressed,
+ StringUtils.getStackTrace(Thread.currentThread())));
+ }
+
+ /**
+ * Log a warning if the lock was held for too long.
+ *
+ * Should be invoked by the caller immediately AFTER releasing the lock.
+ *
+ * @param acquireTime - timestamp just after acquiring the lock.
+ * @param releaseTime - timestamp just before releasing the lock.
+ */
+ private void check(long acquireTime, long releaseTime) {
+ if (!logger.isWarnEnabled()) {
+ return;
+ }
+
+ final long lockHeldTime = releaseTime - acquireTime;
+ if (lockWarningThreshold - lockHeldTime < 0) {
+ long now;
+ long localLastLogTs;
+ do {
+ now = clock.monotonicNow();
+ localLastLogTs = lastLogTimestamp.get();
+ long deltaSinceLastLog = now - localLastLogTs;
+ // check should print log or not
+ if (deltaSinceLastLog - minLoggingGap < 0) {
+ warningsSuppressed.incrementAndGet();
+ return;
+ }
+ } while (!lastLogTimestamp.compareAndSet(localLastLogTs, now));
+ long suppressed = warningsSuppressed.getAndSet(0);
+ logWarning(lockHeldTime, suppressed);
+ }
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
index 1362c0bba95..3a12d74120a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java
@@ -2624,7 +2624,7 @@ public class BlockManager implements BlockStatsMXBean {
} while (storageBlock != null);
}
- // Iterate any remaing blocks that have not been reported and remove them
+ // Iterate any remaining blocks that have not been reported and remove them
while (storageBlocksIterator.hasNext()) {
toRemove.add(storageBlocksIterator.next());
}
@@ -2677,7 +2677,7 @@ public class BlockManager implements BlockStatsMXBean {
corruptReplicas.isReplicaCorrupt(storedBlock, dn))) {
// Add replica if appropriate. If the replica was previously corrupt
// but now okay, it might need to be updated.
- toAdd.add(new BlockInfoToAdd(storedBlock, replica));
+ toAdd.add(new BlockInfoToAdd(storedBlock, new Block(replica)));
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
index 4bde758bc19..f3247fca27d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java
@@ -1179,10 +1179,11 @@ class BPServiceActor implements Runnable {
resetBlockReportTime = false;
} else {
/* say the last block report was at 8:20:14. The current report
- * should have started around 9:20:14 (default 1 hour interval).
+ * should have started around 14:20:14 (default 6 hour interval).
* If current time is :
- * 1) normal like 9:20:18, next report should be at 10:20:14
- * 2) unexpected like 11:35:43, next report should be at 12:20:14
+ * 1) normal like 14:20:18, next report should be at 20:20:14.
+ * 2) unexpected like 21:35:43, next report should be at 2:20:14
+ * on the next day.
*/
nextBlockReportTime +=
(((monotonicNow() - nextBlockReportTime + blockReportIntervalMs) /
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
index ec72d97d5c7..d853ae945e4 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DiskBalancer.java
@@ -121,17 +121,23 @@ public class DiskBalancer {
*/
public void shutdown() {
lock.lock();
+ boolean needShutdown = false;
try {
this.isDiskBalancerEnabled = false;
this.currentResult = Result.NO_PLAN;
if ((this.future != null) && (!this.future.isDone())) {
this.currentResult = Result.PLAN_CANCELLED;
this.blockMover.setExitFlag();
- shutdownExecutor();
+ scheduler.shutdown();
+ needShutdown = true;
}
} finally {
lock.unlock();
}
+ // no need to hold lock while shutting down executor.
+ if (needShutdown) {
+ shutdownExecutor();
+ }
}
/**
@@ -139,7 +145,6 @@ public class DiskBalancer {
*/
private void shutdownExecutor() {
final int secondsTowait = 10;
- scheduler.shutdown();
try {
if (!scheduler.awaitTermination(secondsTowait, TimeUnit.SECONDS)) {
scheduler.shutdownNow();
@@ -228,6 +233,7 @@ public class DiskBalancer {
*/
public void cancelPlan(String planID) throws DiskBalancerException {
lock.lock();
+ boolean needShutdown = false;
try {
checkDiskBalancerEnabled();
if (this.planID == null ||
@@ -239,13 +245,18 @@ public class DiskBalancer {
DiskBalancerException.Result.NO_SUCH_PLAN);
}
if (!this.future.isDone()) {
- this.blockMover.setExitFlag();
- shutdownExecutor();
this.currentResult = Result.PLAN_CANCELLED;
+ this.blockMover.setExitFlag();
+ scheduler.shutdown();
+ needShutdown = true;
}
} finally {
lock.unlock();
}
+ // no need to hold lock while shutting down executor.
+ if (needShutdown) {
+ shutdownExecutor();
+ }
}
/**
@@ -490,14 +501,11 @@ public class DiskBalancer {
public void run() {
Thread.currentThread().setName("DiskBalancerThread");
LOG.info("Executing Disk balancer plan. Plan File: {}, Plan ID: {}",
- planFile, planID);
- try {
- for (Map.Entry entry :
- workMap.entrySet()) {
- blockMover.copyBlocks(entry.getKey(), entry.getValue());
- }
- } finally {
- blockMover.setExitFlag();
+ planFile, planID);
+ for (Map.Entry entry :
+ workMap.entrySet()) {
+ blockMover.setRunnable();
+ blockMover.copyBlocks(entry.getKey(), entry.getValue());
}
}
});
@@ -846,8 +854,8 @@ public class DiskBalancer {
if (item.getErrorCount() >= getMaxError(item)) {
item.setErrMsg("Error count exceeded.");
- LOG.info("Maximum error count exceeded. Error count: {} Max error:{} "
- , item.getErrorCount(), item.getMaxDiskErrors());
+ LOG.info("Maximum error count exceeded. Error count: {} Max error:{} ",
+ item.getErrorCount(), item.getMaxDiskErrors());
}
return null;
@@ -951,7 +959,8 @@ public class DiskBalancer {
LOG.error("Exceeded the max error count. source {}, dest: {} " +
"error count: {}", source.getBasePath(),
dest.getBasePath(), item.getErrorCount());
- break;
+ this.setExitFlag();
+ continue;
}
// Check for the block tolerance constraint.
@@ -960,7 +969,8 @@ public class DiskBalancer {
"blocks.",
source.getBasePath(), dest.getBasePath(),
item.getBytesCopied(), item.getBlocksCopied());
- break;
+ this.setExitFlag();
+ continue;
}
ExtendedBlock block = getNextBlock(poolIters, item);
@@ -968,7 +978,8 @@ public class DiskBalancer {
if (block == null) {
LOG.error("No source blocks, exiting the copy. Source: {}, " +
"dest:{}", source.getBasePath(), dest.getBasePath());
- break;
+ this.setExitFlag();
+ continue;
}
// check if someone told us exit, treat this as an interruption
@@ -976,7 +987,7 @@ public class DiskBalancer {
// for the thread, since both getNextBlock and moveBlocAcrossVolume
// can take some time.
if (!shouldRun()) {
- break;
+ continue;
}
long timeUsed;
@@ -995,7 +1006,8 @@ public class DiskBalancer {
LOG.error("Destination volume: {} does not have enough space to" +
" accommodate a block. Block Size: {} Exiting from" +
" copyBlocks.", dest.getBasePath(), block.getNumBytes());
- break;
+ this.setExitFlag();
+ continue;
}
LOG.debug("Moved block with size {} from {} to {}",
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
index ffd2f8ab34d..e9f1dc13a8a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java
@@ -40,6 +40,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.TimeUnit;
import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
@@ -60,6 +62,7 @@ import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.ExtendedBlockId;
+import org.apache.hadoop.hdfs.InstrumentedLock;
import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.BlockListAsLongs;
@@ -267,6 +270,7 @@ class FsDatasetImpl implements FsDatasetSpi {
private final int maxDataLength;
private final AutoCloseableLock datasetLock;
+ private final Condition datasetLockCondition;
/**
* An FSDataset has a directory where it loads its data files.
@@ -278,7 +282,15 @@ class FsDatasetImpl implements FsDatasetSpi {
this.dataStorage = storage;
this.conf = conf;
this.smallBufferSize = DFSUtilClient.getSmallBufferSize(conf);
- this.datasetLock = new AutoCloseableLock();
+ this.datasetLock = new AutoCloseableLock(
+ new InstrumentedLock(getClass().getName(), LOG,
+ conf.getTimeDuration(
+ DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_KEY,
+ DFSConfigKeys.DFS_LOCK_SUPPRESS_WARNING_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS),
+ 300));
+ this.datasetLockCondition = datasetLock.newCondition();
+
// The number of volumes required for operation is the total number
// of volumes minus the number of failed volumes we can tolerate.
volFailuresTolerated = datanode.getDnConf().getVolFailuresTolerated();
@@ -515,7 +527,7 @@ class FsDatasetImpl implements FsDatasetSpi {
// Disable the volume from the service.
asyncDiskService.removeVolume(sd.getCurrentDir());
volumes.removeVolume(absRoot, clearFailure);
- volumes.waitVolumeRemoved(5000, this);
+ volumes.waitVolumeRemoved(5000, datasetLockCondition);
// Removed all replica information for the blocks on the volume.
// Unlike updating the volumeMap in addVolume(), this operation does
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
index ea4d5975cd0..634ad42d89c 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsVolumeList.java
@@ -31,6 +31,8 @@ import java.util.Map;
import java.util.TreeMap;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.StorageType;
@@ -41,6 +43,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.VolumeChoosingPolicy;
import org.apache.hadoop.hdfs.server.datanode.BlockScanner;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.util.AutoCloseableLock;
import org.apache.hadoop.util.DiskChecker.DiskErrorException;
import org.apache.hadoop.util.Time;
@@ -52,7 +55,8 @@ class FsVolumeList {
Collections.synchronizedMap(new TreeMap());
private final ConcurrentLinkedQueue volumesBeingRemoved =
new ConcurrentLinkedQueue<>();
- private Object checkDirsMutex = new Object();
+ private final AutoCloseableLock checkDirsLock;
+ private final Condition checkDirsLockCondition;
private final VolumeChoosingPolicy blockChooser;
private final BlockScanner blockScanner;
@@ -62,6 +66,8 @@ class FsVolumeList {
VolumeChoosingPolicy blockChooser) {
this.blockChooser = blockChooser;
this.blockScanner = blockScanner;
+ this.checkDirsLock = new AutoCloseableLock();
+ this.checkDirsLockCondition = checkDirsLock.newCondition();
for (VolumeFailureInfo volumeFailureInfo: initialVolumeFailureInfos) {
volumeFailureInfos.put(volumeFailureInfo.getFailedStorageLocation(),
volumeFailureInfo);
@@ -224,12 +230,12 @@ class FsVolumeList {
/**
* Calls {@link FsVolumeImpl#checkDirs()} on each volume.
*
- * Use checkDirsMutext to allow only one instance of checkDirs() call
+ * Use {@link checkDirsLock} to allow only one instance of checkDirs() call.
*
* @return list of all the failed volumes.
*/
Set checkDirs() {
- synchronized(checkDirsMutex) {
+ try (AutoCloseableLock lock = checkDirsLock.acquire()) {
Set failedVols = null;
// Make a copy of volumes for performing modification
@@ -260,7 +266,7 @@ class FsVolumeList {
+ " failure volumes.");
}
- waitVolumeRemoved(5000, checkDirsMutex);
+ waitVolumeRemoved(5000, checkDirsLockCondition);
return failedVols;
}
}
@@ -271,13 +277,13 @@ class FsVolumeList {
*
* @param sleepMillis interval to recheck.
*/
- void waitVolumeRemoved(int sleepMillis, Object monitor) {
+ void waitVolumeRemoved(int sleepMillis, Condition condition) {
while (!checkVolumesRemoved()) {
if (FsDatasetImpl.LOG.isDebugEnabled()) {
FsDatasetImpl.LOG.debug("Waiting for volume reference to be released.");
}
try {
- monitor.wait(sleepMillis);
+ condition.await(sleepMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
FsDatasetImpl.LOG.info("Thread interrupted when waiting for "
+ "volume reference to be released.");
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
index a420b047354..95ff722a2fa 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/DiskBalancerException.java
@@ -38,7 +38,8 @@ public class DiskBalancerException extends IOException {
INVALID_MOVE,
INTERNAL_ERROR,
NO_SUCH_PLAN,
- UNKNOWN_KEY
+ UNKNOWN_KEY,
+ INVALID_NODE,
}
private final Result result;
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/CancelCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/CancelCommand.java
index 8b83e270f23..007272eda9e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/CancelCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/CancelCommand.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
-import org.apache.hadoop.hdfs.tools.DiskBalancer;
+import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
import java.io.IOException;
@@ -44,9 +44,10 @@ public class CancelCommand extends Command {
*/
public CancelCommand(Configuration conf) {
super(conf);
- addValidCommandParameters(DiskBalancer.CANCEL, "Cancels a running plan.");
- addValidCommandParameters(DiskBalancer.NODE, "Node to run the command " +
- "against in node:port format.");
+ addValidCommandParameters(DiskBalancerCLI.CANCEL,
+ "Cancels a running plan.");
+ addValidCommandParameters(DiskBalancerCLI.NODE,
+ "Node to run the command against in node:port format.");
}
/**
@@ -57,20 +58,20 @@ public class CancelCommand extends Command {
@Override
public void execute(CommandLine cmd) throws Exception {
LOG.info("Executing \"Cancel plan\" command.");
- Preconditions.checkState(cmd.hasOption(DiskBalancer.CANCEL));
- verifyCommandOptions(DiskBalancer.CANCEL, cmd);
+ Preconditions.checkState(cmd.hasOption(DiskBalancerCLI.CANCEL));
+ verifyCommandOptions(DiskBalancerCLI.CANCEL, cmd);
// We can cancel a plan using datanode address and plan ID
// that you can read from a datanode using queryStatus
- if(cmd.hasOption(DiskBalancer.NODE)) {
- String nodeAddress = cmd.getOptionValue(DiskBalancer.NODE);
- String planHash = cmd.getOptionValue(DiskBalancer.CANCEL);
+ if(cmd.hasOption(DiskBalancerCLI.NODE)) {
+ String nodeAddress = cmd.getOptionValue(DiskBalancerCLI.NODE);
+ String planHash = cmd.getOptionValue(DiskBalancerCLI.CANCEL);
cancelPlanUsingHash(nodeAddress, planHash);
} else {
// Or you can cancel a plan using the plan file. If the user
// points us to the plan file, we can compute the hash as well as read
// the address of the datanode from the plan file.
- String planFile = cmd.getOptionValue(DiskBalancer.CANCEL);
+ String planFile = cmd.getOptionValue(DiskBalancerCLI.CANCEL);
Preconditions.checkArgument(planFile != null && !planFile.isEmpty(),
"Invalid plan file specified.");
String planData = null;
@@ -142,6 +143,6 @@ public class CancelCommand extends Command {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp("hdfs diskbalancer -cancel | -cancel " +
" -node ",
- header, DiskBalancer.getCancelOptions(), footer);
+ header, DiskBalancerCLI.getCancelOptions(), footer);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java
index 5acd0aca3e4..24976694f39 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/Command.java
@@ -37,13 +37,14 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerConstants;
+import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ClusterConnector;
import org.apache.hadoop.hdfs.server.diskbalancer.connectors.ConnectorFactory;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerCluster;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
-import org.apache.hadoop.hdfs.tools.DiskBalancer;
+import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.codehaus.jackson.map.ObjectMapper;
@@ -256,6 +257,7 @@ public abstract class Command extends Configured {
throws IOException {
Set nodeNames = null;
List nodeList = Lists.newArrayList();
+ List invalidNodeList = Lists.newArrayList();
if ((listArg == null) || listArg.isEmpty()) {
return nodeList;
@@ -269,10 +271,22 @@ public abstract class Command extends Configured {
if (node != null) {
nodeList.add(node);
+ } else {
+ invalidNodeList.add(name);
}
}
}
+ if (!invalidNodeList.isEmpty()) {
+ String invalidNodes = StringUtils.join(invalidNodeList.toArray(), ",");
+ String warnMsg = String.format(
+ "The node(s) '%s' not found. "
+ + "Please make sure that '%s' exists in the cluster.",
+ invalidNodes, invalidNodes);
+ throw new DiskBalancerException(warnMsg,
+ DiskBalancerException.Result.INVALID_NODE);
+ }
+
return nodeList;
}
@@ -418,7 +432,7 @@ public abstract class Command extends Configured {
* @return default top number of nodes.
*/
protected int getDefaultTop() {
- return DiskBalancer.DEFAULT_TOP;
+ return DiskBalancerCLI.DEFAULT_TOP;
}
/**
@@ -437,7 +451,7 @@ public abstract class Command extends Configured {
protected int parseTopNodes(final CommandLine cmd, final StrBuilder result) {
String outputLine = "";
int nodes = 0;
- final String topVal = cmd.getOptionValue(DiskBalancer.TOP);
+ final String topVal = cmd.getOptionValue(DiskBalancerCLI.TOP);
if (StringUtils.isBlank(topVal)) {
outputLine = String.format(
"No top limit specified, using default top value %d.",
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java
index f363c340fa2..3a348c9facb 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ExecuteCommand.java
@@ -29,7 +29,7 @@ import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
-import org.apache.hadoop.hdfs.tools.DiskBalancer;
+import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
import java.io.IOException;
@@ -46,7 +46,8 @@ public class ExecuteCommand extends Command {
*/
public ExecuteCommand(Configuration conf) {
super(conf);
- addValidCommandParameters(DiskBalancer.EXECUTE, "Executes a given plan.");
+ addValidCommandParameters(DiskBalancerCLI.EXECUTE,
+ "Executes a given plan.");
}
/**
@@ -57,10 +58,10 @@ public class ExecuteCommand extends Command {
@Override
public void execute(CommandLine cmd) throws Exception {
LOG.info("Executing \"execute plan\" command");
- Preconditions.checkState(cmd.hasOption(DiskBalancer.EXECUTE));
- verifyCommandOptions(DiskBalancer.EXECUTE, cmd);
+ Preconditions.checkState(cmd.hasOption(DiskBalancerCLI.EXECUTE));
+ verifyCommandOptions(DiskBalancerCLI.EXECUTE, cmd);
- String planFile = cmd.getOptionValue(DiskBalancer.EXECUTE);
+ String planFile = cmd.getOptionValue(DiskBalancerCLI.EXECUTE);
Preconditions.checkArgument(planFile != null && !planFile.isEmpty(),
"Invalid plan file specified.");
@@ -88,7 +89,7 @@ public class ExecuteCommand extends Command {
String planHash = DigestUtils.shaHex(planData);
try {
// TODO : Support skipping date check.
- dataNode.submitDiskBalancerPlan(planHash, DiskBalancer.PLAN_VERSION,
+ dataNode.submitDiskBalancerPlan(planHash, DiskBalancerCLI.PLAN_VERSION,
planFile, planData, false);
} catch (DiskBalancerException ex) {
LOG.error("Submitting plan on {} failed. Result: {}, Message: {}",
@@ -111,6 +112,6 @@ public class ExecuteCommand extends Command {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp("hdfs diskbalancer -execute ",
- header, DiskBalancer.getExecuteOptions(), footer);
+ header, DiskBalancerCLI.getExecuteOptions(), footer);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/HelpCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/HelpCommand.java
index 3c2fd0cf7db..c7352997e2b 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/HelpCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/HelpCommand.java
@@ -23,7 +23,7 @@ import com.google.common.base.Preconditions;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.HelpFormatter;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hdfs.tools.DiskBalancer;
+import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
/**
* Help Command prints out detailed help about each command.
@@ -37,7 +37,7 @@ public class HelpCommand extends Command {
*/
public HelpCommand(Configuration conf) {
super(conf);
- addValidCommandParameters(DiskBalancer.HELP, "Help Command");
+ addValidCommandParameters(DiskBalancerCLI.HELP, "Help Command");
}
/**
@@ -53,9 +53,9 @@ public class HelpCommand extends Command {
return;
}
- Preconditions.checkState(cmd.hasOption(DiskBalancer.HELP));
- verifyCommandOptions(DiskBalancer.HELP, cmd);
- String helpCommand = cmd.getOptionValue(DiskBalancer.HELP);
+ Preconditions.checkState(cmd.hasOption(DiskBalancerCLI.HELP));
+ verifyCommandOptions(DiskBalancerCLI.HELP, cmd);
+ String helpCommand = cmd.getOptionValue(DiskBalancerCLI.HELP);
if (helpCommand == null || helpCommand.isEmpty()) {
this.printHelp();
return;
@@ -65,19 +65,19 @@ public class HelpCommand extends Command {
helpCommand = helpCommand.toLowerCase();
Command command = null;
switch (helpCommand) {
- case DiskBalancer.PLAN:
+ case DiskBalancerCLI.PLAN:
command = new PlanCommand(getConf());
break;
- case DiskBalancer.EXECUTE:
+ case DiskBalancerCLI.EXECUTE:
command = new ExecuteCommand(getConf());
break;
- case DiskBalancer.QUERY:
+ case DiskBalancerCLI.QUERY:
command = new QueryCommand(getConf());
break;
- case DiskBalancer.CANCEL:
+ case DiskBalancerCLI.CANCEL:
command = new CancelCommand(getConf());
break;
- case DiskBalancer.REPORT:
+ case DiskBalancerCLI.REPORT:
command = new ReportCommand(getConf(), null);
break;
default:
@@ -102,7 +102,7 @@ public class HelpCommand extends Command {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp("hdfs diskbalancer [command] [options]",
- header, DiskBalancer.getHelpOptions(), "");
+ header, DiskBalancerCLI.getHelpOptions(), "");
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java
index 72ad2c6bdcb..97494097e6d 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/PlanCommand.java
@@ -28,7 +28,7 @@ import org.apache.hadoop.hdfs.server.diskbalancer.datamodel
.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.NodePlan;
import org.apache.hadoop.hdfs.server.diskbalancer.planner.Step;
-import org.apache.hadoop.hdfs.tools.DiskBalancer;
+import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
import java.nio.charset.StandardCharsets;
import java.util.List;
@@ -53,18 +53,18 @@ public class PlanCommand extends Command {
this.thresholdPercentage = 1;
this.bandwidth = 0;
this.maxError = 0;
- addValidCommandParameters(DiskBalancer.OUTFILE, "Output directory in " +
+ addValidCommandParameters(DiskBalancerCLI.OUTFILE, "Output directory in " +
"HDFS. The generated plan will be written to a file in this " +
"directory.");
- addValidCommandParameters(DiskBalancer.BANDWIDTH, "Maximum Bandwidth to " +
- "be used while copying.");
- addValidCommandParameters(DiskBalancer.THRESHOLD, "Percentage skew that " +
- "we tolerate before diskbalancer starts working.");
- addValidCommandParameters(DiskBalancer.MAXERROR, "Max errors to tolerate " +
- "between 2 disks");
- addValidCommandParameters(DiskBalancer.VERBOSE, "Run plan command in " +
+ addValidCommandParameters(DiskBalancerCLI.BANDWIDTH,
+ "Maximum Bandwidth to be used while copying.");
+ addValidCommandParameters(DiskBalancerCLI.THRESHOLD,
+ "Percentage skew that we tolerate before diskbalancer starts working.");
+ addValidCommandParameters(DiskBalancerCLI.MAXERROR,
+ "Max errors to tolerate between 2 disks");
+ addValidCommandParameters(DiskBalancerCLI.VERBOSE, "Run plan command in " +
"verbose mode.");
- addValidCommandParameters(DiskBalancer.PLAN, "Plan Command");
+ addValidCommandParameters(DiskBalancerCLI.PLAN, "Plan Command");
}
/**
@@ -77,36 +77,37 @@ public class PlanCommand extends Command {
@Override
public void execute(CommandLine cmd) throws Exception {
LOG.debug("Processing Plan Command.");
- Preconditions.checkState(cmd.hasOption(DiskBalancer.PLAN));
- verifyCommandOptions(DiskBalancer.PLAN, cmd);
+ Preconditions.checkState(cmd.hasOption(DiskBalancerCLI.PLAN));
+ verifyCommandOptions(DiskBalancerCLI.PLAN, cmd);
- if (cmd.getOptionValue(DiskBalancer.PLAN) == null) {
+ if (cmd.getOptionValue(DiskBalancerCLI.PLAN) == null) {
throw new IllegalArgumentException("A node name is required to create a" +
" plan.");
}
- if (cmd.hasOption(DiskBalancer.BANDWIDTH)) {
- this.bandwidth = Integer.parseInt(cmd.getOptionValue(DiskBalancer
+ if (cmd.hasOption(DiskBalancerCLI.BANDWIDTH)) {
+ this.bandwidth = Integer.parseInt(cmd.getOptionValue(DiskBalancerCLI
.BANDWIDTH));
}
- if (cmd.hasOption(DiskBalancer.MAXERROR)) {
- this.maxError = Integer.parseInt(cmd.getOptionValue(DiskBalancer
+ if (cmd.hasOption(DiskBalancerCLI.MAXERROR)) {
+ this.maxError = Integer.parseInt(cmd.getOptionValue(DiskBalancerCLI
.MAXERROR));
}
readClusterInfo(cmd);
String output = null;
- if (cmd.hasOption(DiskBalancer.OUTFILE)) {
- output = cmd.getOptionValue(DiskBalancer.OUTFILE);
+ if (cmd.hasOption(DiskBalancerCLI.OUTFILE)) {
+ output = cmd.getOptionValue(DiskBalancerCLI.OUTFILE);
}
setOutputPath(output);
// -plan nodename is the command line argument.
- DiskBalancerDataNode node = getNode(cmd.getOptionValue(DiskBalancer.PLAN));
+ DiskBalancerDataNode node =
+ getNode(cmd.getOptionValue(DiskBalancerCLI.PLAN));
if (node == null) {
throw new IllegalArgumentException("Unable to find the specified node. " +
- cmd.getOptionValue(DiskBalancer.PLAN));
+ cmd.getOptionValue(DiskBalancerCLI.PLAN));
}
this.thresholdPercentage = getThresholdPercentage(cmd);
@@ -124,8 +125,8 @@ public class PlanCommand extends Command {
try (FSDataOutputStream beforeStream = create(String.format(
- DiskBalancer.BEFORE_TEMPLATE,
- cmd.getOptionValue(DiskBalancer.PLAN)))) {
+ DiskBalancerCLI.BEFORE_TEMPLATE,
+ cmd.getOptionValue(DiskBalancerCLI.PLAN)))) {
beforeStream.write(getCluster().toJson()
.getBytes(StandardCharsets.UTF_8));
}
@@ -133,17 +134,17 @@ public class PlanCommand extends Command {
if (plan != null && plan.getVolumeSetPlans().size() > 0) {
LOG.info("Writing plan to : {}", getOutputPath());
try (FSDataOutputStream planStream = create(String.format(
- DiskBalancer.PLAN_TEMPLATE,
- cmd.getOptionValue(DiskBalancer.PLAN)))) {
+ DiskBalancerCLI.PLAN_TEMPLATE,
+ cmd.getOptionValue(DiskBalancerCLI.PLAN)))) {
planStream.write(plan.toJson().getBytes(StandardCharsets.UTF_8));
}
} else {
LOG.info("No plan generated. DiskBalancing not needed for node: {} " +
- "threshold used: {}", cmd.getOptionValue(DiskBalancer.PLAN),
+ "threshold used: {}", cmd.getOptionValue(DiskBalancerCLI.PLAN),
this.thresholdPercentage);
}
- if (cmd.hasOption(DiskBalancer.VERBOSE) && plans.size() > 0) {
+ if (cmd.hasOption(DiskBalancerCLI.VERBOSE) && plans.size() > 0) {
printToScreen(plans);
}
}
@@ -162,8 +163,8 @@ public class PlanCommand extends Command {
" will balance the data.";
HelpFormatter helpFormatter = new HelpFormatter();
- helpFormatter.printHelp("hdfs diskbalancer -plan " +
- " [options]", header, DiskBalancer.getPlanOptions(), footer);
+ helpFormatter.printHelp("hdfs diskbalancer -plan [options]",
+ header, DiskBalancerCLI.getPlanOptions(), footer);
}
/**
@@ -174,8 +175,8 @@ public class PlanCommand extends Command {
*/
private double getThresholdPercentage(CommandLine cmd) {
Double value = 0.0;
- if (cmd.hasOption(DiskBalancer.THRESHOLD)) {
- value = Double.parseDouble(cmd.getOptionValue(DiskBalancer.THRESHOLD));
+ if (cmd.hasOption(DiskBalancerCLI.THRESHOLD)) {
+ value = Double.parseDouble(cmd.getOptionValue(DiskBalancerCLI.THRESHOLD));
}
if ((value <= 0.0) || (value > 100.0)) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/QueryCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/QueryCommand.java
index 1557a023f30..a8adcbd5621 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/QueryCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/QueryCommand.java
@@ -27,7 +27,7 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ClientDatanodeProtocol;
import org.apache.hadoop.hdfs.server.datanode.DiskBalancerWorkStatus;
import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
-import org.apache.hadoop.hdfs.tools.DiskBalancer;
+import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
import org.apache.hadoop.net.NetUtils;
/**
@@ -42,9 +42,10 @@ public class QueryCommand extends Command {
*/
public QueryCommand(Configuration conf) {
super(conf);
- addValidCommandParameters(DiskBalancer.QUERY, "Queries the status of disk" +
- " plan running on a given datanode.");
- addValidCommandParameters(DiskBalancer.VERBOSE, "Prints verbose results.");
+ addValidCommandParameters(DiskBalancerCLI.QUERY,
+ "Queries the status of disk plan running on a given datanode.");
+ addValidCommandParameters(DiskBalancerCLI.VERBOSE,
+ "Prints verbose results.");
}
/**
@@ -55,9 +56,9 @@ public class QueryCommand extends Command {
@Override
public void execute(CommandLine cmd) throws Exception {
LOG.info("Executing \"query plan\" command.");
- Preconditions.checkState(cmd.hasOption(DiskBalancer.QUERY));
- verifyCommandOptions(DiskBalancer.QUERY, cmd);
- String nodeName = cmd.getOptionValue(DiskBalancer.QUERY);
+ Preconditions.checkState(cmd.hasOption(DiskBalancerCLI.QUERY));
+ verifyCommandOptions(DiskBalancerCLI.QUERY, cmd);
+ String nodeName = cmd.getOptionValue(DiskBalancerCLI.QUERY);
Preconditions.checkNotNull(nodeName);
nodeName = nodeName.trim();
String nodeAddress = nodeName;
@@ -79,7 +80,7 @@ public class QueryCommand extends Command {
workStatus.getPlanID(),
workStatus.getResult().toString());
- if (cmd.hasOption(DiskBalancer.VERBOSE)) {
+ if (cmd.hasOption(DiskBalancerCLI.VERBOSE)) {
System.out.printf("%s", workStatus.currentStateString());
}
} catch (DiskBalancerException ex) {
@@ -101,6 +102,6 @@ public class QueryCommand extends Command {
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp("hdfs diskbalancer -query [options]",
- header, DiskBalancer.getQueryOptions(), footer);
+ header, DiskBalancerCLI.getQueryOptions(), footer);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ReportCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ReportCommand.java
index 18dd77eacc2..79ba14f5fe3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ReportCommand.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/diskbalancer/command/ReportCommand.java
@@ -27,10 +27,11 @@ import org.apache.commons.cli.HelpFormatter;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.text.StrBuilder;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.server.diskbalancer.DiskBalancerException;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerDataNode;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolume;
import org.apache.hadoop.hdfs.server.diskbalancer.datamodel.DiskBalancerVolumeSet;
-import org.apache.hadoop.hdfs.tools.DiskBalancer;
+import org.apache.hadoop.hdfs.tools.DiskBalancerCLI;
import com.google.common.base.Preconditions;
import com.google.common.collect.Lists;
@@ -52,15 +53,15 @@ public class ReportCommand extends Command {
super(conf);
this.out = out;
- addValidCommandParameters(DiskBalancer.REPORT,
+ addValidCommandParameters(DiskBalancerCLI.REPORT,
"Report volume information of nodes.");
String desc = String.format(
"Top number of nodes to be processed. Default: %d", getDefaultTop());
- addValidCommandParameters(DiskBalancer.TOP, desc);
+ addValidCommandParameters(DiskBalancerCLI.TOP, desc);
- desc = String.format("Print out volume information for a DataNode.");
- addValidCommandParameters(DiskBalancer.NODE, desc);
+ desc = String.format("Print out volume information for DataNode(s).");
+ addValidCommandParameters(DiskBalancerCLI.NODE, desc);
}
@Override
@@ -69,8 +70,8 @@ public class ReportCommand extends Command {
String outputLine = "Processing report command";
recordOutput(result, outputLine);
- Preconditions.checkState(cmd.hasOption(DiskBalancer.REPORT));
- verifyCommandOptions(DiskBalancer.REPORT, cmd);
+ Preconditions.checkState(cmd.hasOption(DiskBalancerCLI.REPORT));
+ verifyCommandOptions(DiskBalancerCLI.REPORT, cmd);
readClusterInfo(cmd);
final String nodeFormat =
@@ -81,9 +82,9 @@ public class ReportCommand extends Command {
"[%s: volume-%s] - %.2f used: %d/%d, %.2f free: %d/%d, "
+ "isFailed: %s, isReadOnly: %s, isSkip: %s, isTransient: %s.";
- if (cmd.hasOption(DiskBalancer.NODE)) {
+ if (cmd.hasOption(DiskBalancerCLI.NODE)) {
/*
- * Reporting volume information for a specific DataNode
+ * Reporting volume information for specific DataNode(s)
*/
handleNodeReport(cmd, result, nodeFormatWithoutSequence, volumeFormat);
@@ -133,84 +134,100 @@ public class ReportCommand extends Command {
final String nodeFormat, final String volumeFormat) throws Exception {
String outputLine = "";
/*
- * get value that identifies a DataNode from command line, it could be UUID,
- * IP address or host name.
+ * get value that identifies DataNode(s) from command line, it could be
+ * UUID, IP address or host name.
*/
- final String nodeVal = cmd.getOptionValue(DiskBalancer.NODE);
+ final String nodeVal = cmd.getOptionValue(DiskBalancerCLI.NODE);
if (StringUtils.isBlank(nodeVal)) {
outputLine = "The value for '-node' is neither specified or empty.";
recordOutput(result, outputLine);
} else {
/*
- * Reporting volume information for a specific DataNode
+ * Reporting volume information for specific DataNode(s)
*/
outputLine = String.format(
- "Reporting volume information for DataNode '%s'.", nodeVal);
+ "Reporting volume information for DataNode(s) '%s'.", nodeVal);
recordOutput(result, outputLine);
- final String trueStr = "True";
- final String falseStr = "False";
- DiskBalancerDataNode dbdn = getNode(nodeVal);
- // get storage path of datanode
- populatePathNames(dbdn);
+ List dbdns = Lists.newArrayList();
+ try {
+ dbdns = getNodes(nodeVal);
+ } catch (DiskBalancerException e) {
+ // If there are some invalid nodes that contained in nodeVal,
+ // the exception will be threw.
+ recordOutput(result, e.getMessage());
+ return;
+ }
- if (dbdn == null) {
- outputLine = String.format(
- "Can't find a DataNode that matches '%s'.", nodeVal);
- recordOutput(result, outputLine);
- } else {
- result.appendln(String.format(nodeFormat,
- dbdn.getDataNodeName(),
- dbdn.getDataNodeIP(),
- dbdn.getDataNodePort(),
- dbdn.getDataNodeUUID(),
- dbdn.getVolumeCount(),
- dbdn.getNodeDataDensity()));
-
- List volumeList = Lists.newArrayList();
- for (DiskBalancerVolumeSet vset : dbdn.getVolumeSets().values()) {
- for (DiskBalancerVolume vol : vset.getVolumes()) {
- volumeList.add(String.format(volumeFormat,
- vol.getStorageType(),
- vol.getPath(),
- vol.getUsedRatio(),
- vol.getUsed(),
- vol.getCapacity(),
- vol.getFreeRatio(),
- vol.getFreeSpace(),
- vol.getCapacity(),
- vol.isFailed() ? trueStr : falseStr,
- vol.isReadOnly() ? trueStr : falseStr,
- vol.isSkip() ? trueStr : falseStr,
- vol.isTransient() ? trueStr : falseStr));
- }
+ if (!dbdns.isEmpty()) {
+ for (DiskBalancerDataNode node : dbdns) {
+ recordNodeReport(result, node, nodeFormat, volumeFormat);
+ result.append(System.lineSeparator());
}
-
- Collections.sort(volumeList);
- result.appendln(
- StringUtils.join(volumeList.toArray(), System.lineSeparator()));
}
}
}
+ /**
+ * Put node report lines to string buffer.
+ */
+ private void recordNodeReport(StrBuilder result, DiskBalancerDataNode dbdn,
+ final String nodeFormat, final String volumeFormat) throws Exception {
+ final String trueStr = "True";
+ final String falseStr = "False";
+
+ // get storage path of datanode
+ populatePathNames(dbdn);
+ result.appendln(String.format(nodeFormat,
+ dbdn.getDataNodeName(),
+ dbdn.getDataNodeIP(),
+ dbdn.getDataNodePort(),
+ dbdn.getDataNodeUUID(),
+ dbdn.getVolumeCount(),
+ dbdn.getNodeDataDensity()));
+
+ List volumeList = Lists.newArrayList();
+ for (DiskBalancerVolumeSet vset : dbdn.getVolumeSets().values()) {
+ for (DiskBalancerVolume vol : vset.getVolumes()) {
+ volumeList.add(String.format(volumeFormat,
+ vol.getStorageType(),
+ vol.getPath(),
+ vol.getUsedRatio(),
+ vol.getUsed(),
+ vol.getCapacity(),
+ vol.getFreeRatio(),
+ vol.getFreeSpace(),
+ vol.getCapacity(),
+ vol.isFailed() ? trueStr : falseStr,
+ vol.isReadOnly() ? trueStr: falseStr,
+ vol.isSkip() ? trueStr : falseStr,
+ vol.isTransient() ? trueStr : falseStr));
+ }
+ }
+
+ Collections.sort(volumeList);
+ result.appendln(
+ StringUtils.join(volumeList.toArray(), System.lineSeparator()));
+ }
+
/**
* Prints the help message.
*/
@Override
public void printHelp() {
- String header = "Report command reports the volume information of a given" +
- " datanode, or prints out the list of nodes that will benefit from " +
- "running disk balancer. Top defaults to " + getDefaultTop();
+ String header = "Report command reports the volume information of given" +
+ " datanode(s), or prints out the list of nodes that will benefit " +
+ "from running disk balancer. Top defaults to " + getDefaultTop();
String footer = ". E.g.:\n"
+ "hdfs diskbalancer -report\n"
+ "hdfs diskbalancer -report -top 5\n"
+ "hdfs diskbalancer -report "
- + "-node {DataNodeID | IP | Hostname}";
+ + "-node [,...]";
HelpFormatter helpFormatter = new HelpFormatter();
helpFormatter.printHelp("hdfs diskbalancer -fs http://namenode.uri " +
"-report [options]",
- header, DiskBalancer.getReportOptions(), footer);
+ header, DiskBalancerCLI.getReportOptions(), footer);
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
index 57f7cb197b6..6b529498686 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java
@@ -168,6 +168,7 @@ import org.apache.hadoop.ipc.RetryCache.CacheEntry;
import org.apache.hadoop.ipc.RetryCache.CacheEntryWithPayload;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.ipc.StandbyException;
+import org.apache.hadoop.ipc.WritableRpcEngine;
import org.apache.hadoop.ipc.RefreshRegistry;
import org.apache.hadoop.ipc.RefreshResponse;
import org.apache.hadoop.net.Node;
@@ -316,6 +317,8 @@ public class NameNodeRpcServer implements NamenodeProtocols {
new TraceAdminProtocolServerSideTranslatorPB(this);
BlockingService traceAdminService = TraceAdminService
.newReflectiveBlockingService(traceAdminXlator);
+
+ WritableRpcEngine.ensureInitialized();
InetSocketAddress serviceRpcAddr = nn.getServiceRpcServerAddress(conf);
if (serviceRpcAddr != null) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java
similarity index 96%
rename from hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java
rename to hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java
index 1ed2fdcea2f..e961c149d83 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DiskBalancerCLI.java
@@ -50,7 +50,7 @@ import java.io.PrintStream;
* At very high level diskbalancer computes a set of moves that will make disk
* utilization equal and then those moves are executed by the datanode.
*/
-public class DiskBalancer extends Configured implements Tool {
+public class DiskBalancerCLI extends Configured implements Tool {
/**
* Computes a plan for a given set of nodes.
*/
@@ -126,7 +126,7 @@ public class DiskBalancer extends Configured implements Tool {
*/
public static final String PLAN_TEMPLATE = "%s.plan.json";
private static final Logger LOG =
- LoggerFactory.getLogger(DiskBalancer.class);
+ LoggerFactory.getLogger(DiskBalancerCLI.class);
private static final Options PLAN_OPTIONS = new Options();
private static final Options EXECUTE_OPTIONS = new Options();
@@ -140,7 +140,7 @@ public class DiskBalancer extends Configured implements Tool {
*
* @param conf
*/
- public DiskBalancer(Configuration conf) {
+ public DiskBalancerCLI(Configuration conf) {
super(conf);
}
@@ -151,7 +151,7 @@ public class DiskBalancer extends Configured implements Tool {
* @throws Exception
*/
public static void main(String[] argv) throws Exception {
- DiskBalancer shell = new DiskBalancer(new HdfsConfiguration());
+ DiskBalancerCLI shell = new DiskBalancerCLI(new HdfsConfiguration());
int res = 0;
try {
res = ToolRunner.run(shell, argv);
@@ -446,27 +446,27 @@ public class DiskBalancer extends Configured implements Tool {
private int dispatch(CommandLine cmd, Options opts, final PrintStream out)
throws Exception {
Command currentCommand = null;
- if (cmd.hasOption(DiskBalancer.PLAN)) {
+ if (cmd.hasOption(DiskBalancerCLI.PLAN)) {
currentCommand = new PlanCommand(getConf());
}
- if (cmd.hasOption(DiskBalancer.EXECUTE)) {
+ if (cmd.hasOption(DiskBalancerCLI.EXECUTE)) {
currentCommand = new ExecuteCommand(getConf());
}
- if (cmd.hasOption(DiskBalancer.QUERY)) {
+ if (cmd.hasOption(DiskBalancerCLI.QUERY)) {
currentCommand = new QueryCommand(getConf());
}
- if (cmd.hasOption(DiskBalancer.CANCEL)) {
+ if (cmd.hasOption(DiskBalancerCLI.CANCEL)) {
currentCommand = new CancelCommand(getConf());
}
- if (cmd.hasOption(DiskBalancer.REPORT)) {
+ if (cmd.hasOption(DiskBalancerCLI.REPORT)) {
currentCommand = new ReportCommand(getConf(), out);
}
- if (cmd.hasOption(DiskBalancer.HELP)) {
+ if (cmd.hasOption(DiskBalancerCLI.HELP)) {
currentCommand = new HelpCommand(getConf());
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionCalculator.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionCalculator.java
index 33ab641e54c..71fb822eed6 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionCalculator.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionCalculator.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.server.namenode.FSImageUtil;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.FileSummary;
import org.apache.hadoop.hdfs.server.namenode.FsImageProto.INodeSection;
import org.apache.hadoop.util.LimitInputStream;
+import org.apache.hadoop.util.StringUtils;
import com.google.common.base.Preconditions;
@@ -75,11 +76,14 @@ final class FileDistributionCalculator {
private long totalSpace;
private long maxFileSize;
+ private boolean formatOutput = false;
+
FileDistributionCalculator(Configuration conf, long maxSize, int steps,
- PrintStream out) {
+ boolean formatOutput, PrintStream out) {
this.conf = conf;
this.maxSize = maxSize == 0 ? MAX_SIZE_DEFAULT : maxSize;
this.steps = steps == 0 ? INTERVAL_DEFAULT : steps;
+ this.formatOutput = formatOutput;
this.out = out;
long numIntervals = this.maxSize / this.steps;
// avoid OutOfMemoryError when allocating an array
@@ -148,10 +152,20 @@ final class FileDistributionCalculator {
private void output() {
// write the distribution into the output file
- out.print("Size\tNumFiles\n");
+ out.print((formatOutput ? "Size Range" : "Size") + "\tNumFiles\n");
for (int i = 0; i < distribution.length; i++) {
if (distribution[i] != 0) {
- out.print(((long) i * steps) + "\t" + distribution[i]);
+ if (formatOutput) {
+ out.print((i == 0 ? "[" : "(")
+ + StringUtils.byteDesc(((long) (i == 0 ? 0 : i - 1) * steps))
+ + ", "
+ + StringUtils.byteDesc((long)
+ (i == distribution.length - 1 ? maxFileSize : i * steps))
+ + "]\t" + distribution[i]);
+ } else {
+ out.print(((long) i * steps) + "\t" + distribution[i]);
+ }
+
out.print('\n');
}
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionVisitor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionVisitor.java
index 1cef720624d..7dcc29998f3 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionVisitor.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/FileDistributionVisitor.java
@@ -20,6 +20,8 @@ package org.apache.hadoop.hdfs.tools.offlineImageViewer;
import java.io.IOException;
import java.util.LinkedList;
+import org.apache.hadoop.util.StringUtils;
+
/**
* File size distribution visitor.
*
@@ -67,6 +69,7 @@ class FileDistributionVisitor extends TextWriterImageVisitor {
private FileContext current;
private boolean inInode = false;
+ private boolean formatOutput = false;
/**
* File or directory information.
@@ -78,12 +81,12 @@ class FileDistributionVisitor extends TextWriterImageVisitor {
int replication;
}
- public FileDistributionVisitor(String filename,
- long maxSize,
- int step) throws IOException {
+ public FileDistributionVisitor(String filename, long maxSize, int step,
+ boolean formatOutput) throws IOException {
super(filename, false);
this.maxSize = (maxSize == 0 ? MAX_SIZE_DEFAULT : maxSize);
this.step = (step == 0 ? INTERVAL_DEFAULT : step);
+ this.formatOutput = formatOutput;
long numIntervals = this.maxSize / this.step;
if(numIntervals >= Integer.MAX_VALUE)
throw new IOException("Too many distribution intervals " + numIntervals);
@@ -113,9 +116,22 @@ class FileDistributionVisitor extends TextWriterImageVisitor {
private void output() throws IOException {
// write the distribution into the output file
- write("Size\tNumFiles\n");
- for(int i = 0; i < distribution.length; i++)
- write(((long)i * step) + "\t" + distribution[i] + "\n");
+ write((formatOutput ? "Size Range" : "Size") + "\tNumFiles\n");
+ for (int i = 0; i < distribution.length; i++) {
+ if (distribution[i] > 0) {
+ if (formatOutput) {
+ write((i == 0 ? "[" : "(")
+ + StringUtils.byteDesc(((long) (i == 0 ? 0 : i - 1) * step))
+ + ", "
+ + StringUtils.byteDesc((long)
+ (i == distribution.length - 1 ? maxFileSize : i * step))
+ + "]\t"
+ + distribution[i] + "\n");
+ } else {
+ write(((long) i * step) + "\t" + distribution[i] + "\n");
+ }
+ }
+ }
System.out.println("totalFiles = " + totalFiles);
System.out.println("totalDirectories = " + totalDirectories);
System.out.println("totalBlocks = " + totalBlocks);
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java
index 770cde14855..c542d908b6e 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewer.java
@@ -46,61 +46,63 @@ import org.apache.hadoop.hdfs.server.namenode.FSEditLogLoader.PositionTrackingIn
public class OfflineImageViewer {
public static final Log LOG = LogFactory.getLog(OfflineImageViewer.class);
- private final static String usage =
- "Usage: bin/hdfs oiv_legacy [OPTIONS] -i INPUTFILE -o OUTPUTFILE\n" +
- "Offline Image Viewer\n" +
- "View a Hadoop fsimage INPUTFILE using the specified PROCESSOR,\n" +
- "saving the results in OUTPUTFILE.\n" +
- "\n" +
- "The oiv utility will attempt to parse correctly formed image files\n" +
- "and will abort fail with mal-formed image files.\n" +
- "\n" +
- "The tool works offline and does not require a running cluster in\n" +
- "order to process an image file.\n" +
- "\n" +
- "The following image processors are available:\n" +
- " * Ls: The default image processor generates an lsr-style listing\n" +
- " of the files in the namespace, with the same fields in the same\n" +
- " order. Note that in order to correctly determine file sizes,\n" +
- " this formatter cannot skip blocks and will override the\n" +
- " -skipBlocks option.\n" +
- " * Indented: This processor enumerates over all of the elements in\n" +
- " the fsimage file, using levels of indentation to delineate\n" +
- " sections within the file.\n" +
- " * Delimited: Generate a text file with all of the elements common\n" +
- " to both inodes and inodes-under-construction, separated by a\n" +
- " delimiter. The default delimiter is \u0001, though this may be\n" +
- " changed via the -delimiter argument. This processor also overrides\n" +
- " the -skipBlocks option for the same reason as the Ls processor\n" +
- " * XML: This processor creates an XML document with all elements of\n" +
- " the fsimage enumerated, suitable for further analysis by XML\n" +
- " tools.\n" +
- " * FileDistribution: This processor analyzes the file size\n" +
- " distribution in the image.\n" +
- " -maxSize specifies the range [0, maxSize] of file sizes to be\n" +
- " analyzed (128GB by default).\n" +
- " -step defines the granularity of the distribution. (2MB by default)\n" +
- " * NameDistribution: This processor analyzes the file names\n" +
- " in the image and prints total number of file names and how frequently\n" +
- " file names are reused.\n" +
- "\n" +
- "Required command line arguments:\n" +
- "-i,--inputFile FSImage file to process.\n" +
- "-o,--outputFile Name of output file. If the specified\n" +
- " file exists, it will be overwritten.\n" +
- "\n" +
- "Optional command line arguments:\n" +
- "-p,--processor Select which type of processor to apply\n" +
- " against image file." +
- " (Ls|XML|Delimited|Indented|FileDistribution).\n" +
- "-h,--help Display usage information and exit\n" +
- "-printToScreen For processors that write to a file, also\n" +
- " output to screen. On large image files this\n" +
- " will dramatically increase processing time.\n" +
- "-skipBlocks Skip inodes' blocks information. May\n" +
- " significantly decrease output.\n" +
- " (default = false).\n" +
- "-delimiter Delimiting string to use with Delimited processor\n";
+ private final static String usage =
+ "Usage: bin/hdfs oiv_legacy [OPTIONS] -i INPUTFILE -o OUTPUTFILE\n"
+ + "Offline Image Viewer\n"
+ + "View a Hadoop fsimage INPUTFILE using the specified PROCESSOR,\n"
+ + "saving the results in OUTPUTFILE.\n"
+ + "\n"
+ + "The oiv utility will attempt to parse correctly formed image files\n"
+ + "and will abort fail with mal-formed image files.\n"
+ + "\n"
+ + "The tool works offline and does not require a running cluster in\n"
+ + "order to process an image file.\n"
+ + "\n"
+ + "The following image processors are available:\n"
+ + " * Ls: The default image processor generates an lsr-style listing\n"
+ + " of the files in the namespace, with the same fields in the same\n"
+ + " order. Note that in order to correctly determine file sizes,\n"
+ + " this formatter cannot skip blocks and will override the\n"
+ + " -skipBlocks option.\n"
+ + " * Indented: This processor enumerates over all of the elements in\n"
+ + " the fsimage file, using levels of indentation to delineate\n"
+ + " sections within the file.\n"
+ + " * Delimited: Generate a text file with all of the elements common\n"
+ + " to both inodes and inodes-under-construction, separated by a\n"
+ + " delimiter. The default delimiter is \u0001, though this may be\n"
+ + " changed via the -delimiter argument. This processor also overrides\n"
+ + " the -skipBlocks option for the same reason as the Ls processor\n"
+ + " * XML: This processor creates an XML document with all elements of\n"
+ + " the fsimage enumerated, suitable for further analysis by XML\n"
+ + " tools.\n"
+ + " * FileDistribution: This processor analyzes the file size\n"
+ + " distribution in the image.\n"
+ + " -maxSize specifies the range [0, maxSize] of file sizes to be\n"
+ + " analyzed (128GB by default).\n"
+ + " -step defines the granularity of the distribution. (2MB by default)\n"
+ + " -format formats the output result in a human-readable fashion\n"
+ + " rather than a number of bytes. (false by default)\n"
+ + " * NameDistribution: This processor analyzes the file names\n"
+ + " in the image and prints total number of file names and how frequently\n"
+ + " file names are reused.\n"
+ + "\n"
+ + "Required command line arguments:\n"
+ + "-i,--inputFile FSImage file to process.\n"
+ + "-o,--outputFile Name of output file. If the specified\n"
+ + " file exists, it will be overwritten.\n"
+ + "\n"
+ + "Optional command line arguments:\n"
+ + "-p,--processor Select which type of processor to apply\n"
+ + " against image file."
+ + " (Ls|XML|Delimited|Indented|FileDistribution).\n"
+ + "-h,--help Display usage information and exit\n"
+ + "-printToScreen For processors that write to a file, also\n"
+ + " output to screen. On large image files this\n"
+ + " will dramatically increase processing time.\n"
+ + "-skipBlocks Skip inodes' blocks information. May\n"
+ + " significantly decrease output.\n"
+ + " (default = false).\n"
+ + "-delimiter Delimiting string to use with Delimited processor\n";
private final boolean skipBlocks;
private final String inputFile;
@@ -188,6 +190,7 @@ public class OfflineImageViewer {
options.addOption("h", "help", false, "");
options.addOption("maxSize", true, "");
options.addOption("step", true, "");
+ options.addOption("format", false, "");
options.addOption("skipBlocks", false, "");
options.addOption("printToScreen", false, "");
options.addOption("delimiter", true, "");
@@ -253,7 +256,8 @@ public class OfflineImageViewer {
} else if (processor.equals("FileDistribution")) {
long maxSize = Long.parseLong(cmd.getOptionValue("maxSize", "0"));
int step = Integer.parseInt(cmd.getOptionValue("step", "0"));
- v = new FileDistributionVisitor(outputFile, maxSize, step);
+ boolean formatOutput = cmd.hasOption("format");
+ v = new FileDistributionVisitor(outputFile, maxSize, step, formatOutput);
} else if (processor.equals("NameDistribution")) {
v = new NameDistributionVisitor(outputFile, printToScreen);
} else {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
index b514b3f86bb..c1141f30aec 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/offlineImageViewer/OfflineImageViewerPB.java
@@ -67,6 +67,8 @@ public class OfflineImageViewerPB {
+ " -maxSize specifies the range [0, maxSize] of file sizes to be\n"
+ " analyzed (128GB by default).\n"
+ " -step defines the granularity of the distribution. (2MB by default)\n"
+ + " -format formats the output result in a human-readable fashion\n"
+ + " rather than a number of bytes. (false by default)\n"
+ " * Web: Run a viewer to expose read-only WebHDFS API.\n"
+ " -addr specifies the address to listen. (localhost:5978 by default)\n"
+ " * Delimited (experimental): Generate a text file with all of the elements common\n"
@@ -111,6 +113,7 @@ public class OfflineImageViewerPB {
options.addOption("h", "help", false, "");
options.addOption("maxSize", true, "");
options.addOption("step", true, "");
+ options.addOption("format", false, "");
options.addOption("addr", true, "");
options.addOption("delimiter", true, "");
options.addOption("t", "temp", true, "");
@@ -172,43 +175,44 @@ public class OfflineImageViewerPB {
try (PrintStream out = outputFile.equals("-") ?
System.out : new PrintStream(outputFile, "UTF-8")) {
switch (processor) {
- case "FileDistribution":
- long maxSize = Long.parseLong(cmd.getOptionValue("maxSize", "0"));
- int step = Integer.parseInt(cmd.getOptionValue("step", "0"));
- new FileDistributionCalculator(conf, maxSize, step, out).visit(
- new RandomAccessFile(inputFile, "r"));
- break;
- case "XML":
- new PBImageXmlWriter(conf, out).visit(
- new RandomAccessFile(inputFile, "r"));
- break;
- case "ReverseXML":
- try {
- OfflineImageReconstructor.run(inputFile, outputFile);
- } catch (Exception e) {
- System.err.println("OfflineImageReconstructor failed: " +
- e.getMessage());
- e.printStackTrace(System.err);
- System.exit(1);
- }
- break;
- case "Web":
- String addr = cmd.getOptionValue("addr", "localhost:5978");
- try (WebImageViewer viewer = new WebImageViewer(
- NetUtils.createSocketAddr(addr))) {
- viewer.start(inputFile);
- }
- break;
- case "Delimited":
- try (PBImageDelimitedTextWriter writer =
- new PBImageDelimitedTextWriter(out, delimiter, tempPath)) {
- writer.visit(new RandomAccessFile(inputFile, "r"));
- }
- break;
- default:
- System.err.println("Invalid processor specified : " + processor);
- printUsage();
- return -1;
+ case "FileDistribution":
+ long maxSize = Long.parseLong(cmd.getOptionValue("maxSize", "0"));
+ int step = Integer.parseInt(cmd.getOptionValue("step", "0"));
+ boolean formatOutput = cmd.hasOption("format");
+ new FileDistributionCalculator(conf, maxSize, step, formatOutput, out)
+ .visit(new RandomAccessFile(inputFile, "r"));
+ break;
+ case "XML":
+ new PBImageXmlWriter(conf, out).visit(new RandomAccessFile(inputFile,
+ "r"));
+ break;
+ case "ReverseXML":
+ try {
+ OfflineImageReconstructor.run(inputFile, outputFile);
+ } catch (Exception e) {
+ System.err.println("OfflineImageReconstructor failed: "
+ + e.getMessage());
+ e.printStackTrace(System.err);
+ System.exit(1);
+ }
+ break;
+ case "Web":
+ String addr = cmd.getOptionValue("addr", "localhost:5978");
+ try (WebImageViewer viewer =
+ new WebImageViewer(NetUtils.createSocketAddr(addr))) {
+ viewer.start(inputFile);
+ }
+ break;
+ case "Delimited":
+ try (PBImageDelimitedTextWriter writer =
+ new PBImageDelimitedTextWriter(out, delimiter, tempPath)) {
+ writer.visit(new RandomAccessFile(inputFile, "r"));
+ }
+ break;
+ default:
+ System.err.println("Invalid processor specified : " + processor);
+ printUsage();
+ return -1;
}
return 0;
} catch (EOFException e) {
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
index d427f887067..29c9ef25cf0 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml
@@ -163,7 +163,7 @@
dfs.namenode.http-bind-host
- The actual adress the HTTP server will bind to. If this optional address
+ The actual address the HTTP server will bind to. If this optional address
is set, it overrides only the hostname portion of dfs.namenode.http-address.
It can also be specified per name node or name service for HA/Federation.
This is useful for making the name node HTTP server listen on all
@@ -243,7 +243,7 @@
dfs.namenode.https-bind-host
- The actual adress the HTTPS server will bind to. If this optional address
+ The actual address the HTTPS server will bind to. If this optional address
is set, it overrides only the hostname portion of dfs.namenode.https-address.
It can also be specified per name node or name service for HA/Federation.
This is useful for making the name node HTTPS server listen on all
@@ -650,7 +650,7 @@
dfs.blockreport.initialDelay
- 0
+ 0s
Delay for first block report in seconds. Support multiple time unit
suffix(case insensitive), as described in dfs.heartbeat.interval.
@@ -694,7 +694,7 @@
dfs.datanode.directoryscan.interval
- 21600
+ 21600sInterval in seconds for Datanode to scan data directories and
reconcile the difference between blocks in memory and on the disk.
Support multiple time unit suffix(case insensitive), as described
@@ -732,7 +732,7 @@
dfs.heartbeat.interval
- 3
+ 3s
Determines datanode heartbeat interval in seconds.
Can use the following suffix (case insensitive):
@@ -942,7 +942,7 @@
dfs.namenode.decommission.interval
- 30
+ 30sNamenode periodicity in seconds to check if decommission is
complete. Support multiple time unit suffix(case insensitive), as described
in dfs.heartbeat.interval.
@@ -973,7 +973,7 @@
dfs.namenode.replication.interval
- 3
+ 3sThe periodicity in seconds with which the namenode computes
replication work for datanodes. Support multiple time unit suffix(case insensitive),
as described in dfs.heartbeat.interval.
@@ -1071,7 +1071,7 @@
dfs.namenode.checkpoint.period
- 3600
+ 3600s
The number of seconds between two periodic checkpoints.
Support multiple time unit suffix(case insensitive), as described
@@ -1090,7 +1090,7 @@
dfs.namenode.checkpoint.check.period
- 60
+ 60sThe SecondaryNameNode and CheckpointNode will poll the NameNode
every 'dfs.namenode.checkpoint.check.period' seconds to query the number
of uncheckpointed transactions. Support multiple time unit suffix(case insensitive),
@@ -1433,7 +1433,7 @@
dfs.client.datanode-restart.timeout
- 30
+ 30s
Expert only. The time to wait, in seconds, from reception of an
datanode shutdown notification for quick restart, until declaring
@@ -1502,7 +1502,7 @@
dfs.ha.log-roll.period
- 120
+ 120s
How often, in seconds, the StandbyNode should ask the active to
roll edit logs. Since the StandbyNode only reads from finalized
@@ -1516,7 +1516,7 @@
dfs.ha.tail-edits.period
- 60
+ 60s
How often, in seconds, the StandbyNode should check for new
finalized log segments in the shared edits log.
@@ -2950,7 +2950,7 @@
dfs.datanode.bp-ready.timeout
- 20
+ 20s
The maximum wait time for datanode to be ready before failing the
received request. Setting this to 0 fails requests right away if the
@@ -4273,4 +4273,12 @@
a plan.
+
+
+ dfs.lock.suppress.warning.interval
+ 10s
+ Instrumentation reporting long critical sections will suppress
+ consecutive warnings within this interval.
+
+
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
index 31bea7c77f7..06b73904b0a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ArchivalStorage.md
@@ -89,7 +89,7 @@ Note 2: For the erasure coded files with striping layout, the suitable storage p
When a file or directory is created, its storage policy is *unspecified*. The storage policy can be specified using the "[`storagepolicies -setStoragePolicy`](#Set_Storage_Policy)" command. The effective storage policy of a file or directory is resolved by the following rules.
-1. If the file or directory is specificed with a storage policy, return it.
+1. If the file or directory is specified with a storage policy, return it.
2. For an unspecified file or directory, if it is the root directory, return the *default storage policy*. Otherwise, return its parent's effective storage policy.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
index 22886d3c84a..83035bbb200 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md
@@ -239,6 +239,7 @@ Usage: `hdfs oiv [OPTIONS] -i INPUT_FILE`
| `-addr` *address* | Specify the address(host:port) to listen. (localhost:5978 by default). This option is used with Web processor. |
| `-maxSize` *size* | Specify the range [0, maxSize] of file sizes to be analyzed in bytes (128GB by default). This option is used with FileDistribution processor. |
| `-step` *size* | Specify the granularity of the distribution in bytes (2MB by default). This option is used with FileDistribution processor. |
+| `-format` | Format the output result in a human-readable fashion rather than a number of bytes. (false by default). This option is used with FileDistribution processor. |
| `-delimiter` *arg* | Delimiting string to use with Delimited processor. |
| `-t`,`--temp` *temporary dir* | Use temporary dir to cache intermediate result to generate Delimited outputs. If not set, Delimited processor constructs the namespace in memory before outputting text. |
| `-h`,`--help` | Display the tool usage and help information and exit. |
@@ -260,6 +261,9 @@ Usage: `hdfs oiv_legacy [OPTIONS] -i INPUT_FILE -o OUTPUT_FILE`
| COMMAND\_OPTION | Description |
|:---- |:---- |
| `-p`\|`--processor` *processor* | Specify the image processor to apply against the image file. Valid options are Ls (default), XML, Delimited, Indented, and FileDistribution. |
+| `-maxSize` *size* | Specify the range [0, maxSize] of file sizes to be analyzed in bytes (128GB by default). This option is used with FileDistribution processor. |
+| `-step` *size* | Specify the granularity of the distribution in bytes (2MB by default). This option is used with FileDistribution processor. |
+| `-format` | Format the output result in a human-readable fashion rather than a number of bytes. (false by default). This option is used with FileDistribution processor. |
| `-skipBlocks` | Do not enumerate individual blocks within files. This may save processing time and outfile file space on namespaces with very large files. The Ls processor reads the blocks to correctly determine file sizes and ignores this option. |
| `-printToScreen` | Pipe output of processor to console as well as specified file. On extremely large namespaces, this may increase processing time by an order of magnitude. |
| `-delimiter` *arg* | When used in conjunction with the Delimited processor, replaces the default tab delimiter with the string specified by *arg*. |
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSDiskbalancer.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSDiskbalancer.md
index aea440a3de9..f2bb2b39945 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSDiskbalancer.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSDiskbalancer.md
@@ -102,9 +102,9 @@ or
Plan ID can be read from datanode using query command.
### Report
-Report command provides detailed report about a node.
+Report command provides detailed report about node(s).
-`hdfs diskbalancer -fs http://namenode.uri -report -node {DataNodeID | IP | Hostname}`
+`hdfs diskbalancer -fs http://namenode.uri -report -node [,...]`
Settings
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithNFS.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithNFS.md
index d9f895a7d88..b743233c100 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithNFS.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSHighAvailabilityWithNFS.md
@@ -87,7 +87,7 @@ In order to deploy an HA cluster, you should prepare the following:
* **NameNode machines** - the machines on which you run the Active and Standby NameNodes should have equivalent hardware to each other, and equivalent hardware to what would be used in a non-HA cluster.
-* **Shared storage** - you will need to have a shared directory which the NameNode machines have read/write access to. Typically this is a remote filer which supports NFS and is mounted on each of the NameNode machines. Currently only a single shared edits directory is supported. Thus, the availability of the system is limited by the availability of this shared edits directory, and therefore in order to remove all single points of failure there needs to be redundancy for the shared edits directory. Specifically, multiple network paths to the storage, and redundancy in the storage itself (disk, network, and power). Beacuse of this, it is recommended that the shared storage server be a high-quality dedicated NAS appliance rather than a simple Linux server.
+* **Shared storage** - you will need to have a shared directory which the NameNode machines have read/write access to. Typically this is a remote filer which supports NFS and is mounted on each of the NameNode machines. Currently only a single shared edits directory is supported. Thus, the availability of the system is limited by the availability of this shared edits directory, and therefore in order to remove all single points of failure there needs to be redundancy for the shared edits directory. Specifically, multiple network paths to the storage, and redundancy in the storage itself (disk, network, and power). Because of this, it is recommended that the shared storage server be a high-quality dedicated NAS appliance rather than a simple Linux server.
Note that, in an HA cluster, the Standby NameNodes also perform checkpoints of the namespace state, and thus it is not necessary to run a Secondary NameNode, CheckpointNode, or BackupNode in an HA cluster. In fact, to do so would be an error. This also allows one who is reconfiguring a non-HA-enabled HDFS cluster to be HA-enabled to reuse the hardware which they had previously dedicated to the Secondary NameNode.
@@ -137,7 +137,7 @@ The order in which you set these configurations is unimportant, but the values y
* **dfs.namenode.rpc-address.[nameservice ID].[name node ID]** - the fully-qualified RPC address for each NameNode to listen on
For both of the previously-configured NameNode IDs, set the full address and
- IPC port of the NameNode processs. Note that this results in two separate
+ IPC port of the NameNode process. Note that this results in two separate
configuration options. For example:
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsImageViewer.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsImageViewer.md
index de27fc2679f..f55c9fda024 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsImageViewer.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsImageViewer.md
@@ -50,10 +50,13 @@ The Offline Image Viewer provides several output processors:
..., s[n-1], maxSize], and the processor calculates how many files
in the system fall into each segment [s[i-1], s[i]). Note that
files larger than maxSize always fall into the very last segment.
- The output file is formatted as a tab separated two column table:
- Size and NumFiles. Where Size represents the start of the segment,
+ By default, the output file is formatted as a tab separated two column
+ table: Size and NumFiles. Where Size represents the start of the segment,
and numFiles is the number of files form the image which size falls
- in this segment.
+ in this segment. By specifying the option -format, the output file will be
+ formatted in a human-readable fashion rather than a number of bytes that
+ showed in Size column. In addition, the Size column will be changed to the
+ Size Range column.
4. Delimited (experimental): Generate a text file with all of the elements
common to both inodes and inodes-under-construction, separated by a
@@ -150,6 +153,7 @@ Options
| `-addr` *address* | Specify the address(host:port) to listen. (localhost:5978 by default). This option is used with Web processor. |
| `-maxSize` *size* | Specify the range [0, maxSize] of file sizes to be analyzed in bytes (128GB by default). This option is used with FileDistribution processor. |
| `-step` *size* | Specify the granularity of the distribution in bytes (2MB by default). This option is used with FileDistribution processor. |
+| `-format` | Format the output result in a human-readable fashion rather than a number of bytes. (false by default). This option is used with FileDistribution processor. |
| `-delimiter` *arg* | Delimiting string to use with Delimited processor. |
| `-t`\|`--temp` *temporary dir* | Use temporary dir to cache intermediate result to generate Delimited outputs. If not set, Delimited processor constructs the namespace in memory before outputting text. |
| `-h`\|`--help` | Display the tool usage and help information and exit. |
@@ -181,6 +185,9 @@ Due to the internal layout changes introduced by the ProtocolBuffer-based fsimag
| `-i`\|`--inputFile` *input file* | Specify the input fsimage file to process. Required. |
| `-o`\|`--outputFile` *output file* | Specify the output filename, if the specified output processor generates one. If the specified file already exists, it is silently overwritten. Required. |
| `-p`\|`--processor` *processor* | Specify the image processor to apply against the image file. Valid options are Ls (default), XML, Delimited, Indented, and FileDistribution. |
+| `-maxSize` *size* | Specify the range [0, maxSize] of file sizes to be analyzed in bytes (128GB by default). This option is used with FileDistribution processor. |
+| `-step` *size* | Specify the granularity of the distribution in bytes (2MB by default). This option is used with FileDistribution processor. |
+| `-format` | Format the output result in a human-readable fashion rather than a number of bytes. (false by default). This option is used with FileDistribution processor. |
| `-skipBlocks` | Do not enumerate individual blocks within files. This may save processing time and outfile file space on namespaces with very large files. The Ls processor reads the blocks to correctly determine file sizes and ignores this option. |
| `-printToScreen` | Pipe output of processor to console as well as specified file. On extremely large namespaces, this may increase processing time by an order of magnitude. |
| `-delimiter` *arg* | When used in conjunction with the Delimited processor, replaces the default tab delimiter with the string specified by *arg*. |
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsMultihoming.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsMultihoming.md
index 4be55118f5e..4e1d4804f4f 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsMultihoming.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsMultihoming.md
@@ -86,7 +86,7 @@ The solution is to have separate setting for server endpoints to force binding t
dfs.namenode.http-bind-host0.0.0.0
- The actual adress the HTTP server will bind to. If this optional address
+ The actual address the HTTP server will bind to. If this optional address
is set, it overrides only the hostname portion of dfs.namenode.http-address.
It can also be specified per name node or name service for HA/Federation.
This is useful for making the name node HTTP server listen on all
@@ -98,7 +98,7 @@ The solution is to have separate setting for server endpoints to force binding t
dfs.namenode.https-bind-host0.0.0.0
- The actual adress the HTTPS server will bind to. If this optional address
+ The actual address the HTTPS server will bind to. If this optional address
is set, it overrides only the hostname portion of dfs.namenode.https-address.
It can also be specified per name node or name service for HA/Federation.
This is useful for making the name node HTTPS server listen on all
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsNfsGateway.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsNfsGateway.md
index 67311891586..37a2042d089 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsNfsGateway.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HdfsNfsGateway.md
@@ -148,7 +148,7 @@ It's strongly recommended for the users to update a few configuration properties
characters. The machine name format can be a single host, a "*", a Java regular expression, or an IPv4 address. The access
privilege uses rw or ro to specify read/write or read-only access of the machines to exports. If the access privilege is not provided, the default is read-only. Entries are separated by ";".
For example: "192.168.0.0/22 rw ; \\\\w\*\\\\.example\\\\.com ; host1.test.org ro;". Only the NFS gateway needs to restart after
- this property is updated. Note that, here Java regular expression is differnt with the regrulation expression used in
+ this property is updated. Note that, here Java regular expression is different with the regulation expression used in
Linux NFS export table, such as, using "\\\\w\*\\\\.example\\\\.com" instead of "\*.example.com", "192\\\\.168\\\\.0\\\\.(11|22)"
instead of "192.168.0.[11|22]" and so on.
@@ -183,7 +183,7 @@ It's strongly recommended for the users to update a few configuration properties
* JVM and log settings. You can export JVM settings (e.g., heap size and GC log) in
- HADOOP\_NFS3\_OPTS. More NFS related settings can be found in hadoop-env.sh.
+ HDFS\_NFS3\_OPTS. More NFS related settings can be found in hadoop-env.sh.
To get NFS debug trace, you can edit the log4j.property file
to add the following. Note, debug trace, especially for ONCRPC, can be very verbose.
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ViewFs.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ViewFs.md
index 94662f55df6..5f88deffe2a 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ViewFs.md
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/ViewFs.md
@@ -143,7 +143,7 @@ Hence on Cluster X, where the `core-site.xml` is set to make the default fs to u
### Pathname Usage Best Practices
-When one is within a cluster, it is recommended to use the pathname of type (1) above instead of a fully qualified URI like (2). Futher, applications should not use the knowledge of the mount points and use a path like `hdfs://namenodeContainingUserDirs:port/joe/foo/bar` to refer to a file in a particular namenode. One should use `/user/joe/foo/bar` instead.
+When one is within a cluster, it is recommended to use the pathname of type (1) above instead of a fully qualified URI like (2). Further, applications should not use the knowledge of the mount points and use a path like `hdfs://namenodeContainingUserDirs:port/joe/foo/bar` to refer to a file in a particular namenode. One should use `/user/joe/foo/bar` instead.
### Renaming Pathnames Across Namespaces
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
index 18c2de91d79..1e27745e499 100644
--- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestDFSStripedInputStream.java
@@ -57,7 +57,8 @@ import static org.junit.Assert.assertTrue;
public class TestDFSStripedInputStream {
- public static final Log LOG = LogFactory.getLog(TestDFSStripedInputStream.class);
+ public static final Log LOG =
+ LogFactory.getLog(TestDFSStripedInputStream.class);
private MiniDFSCluster cluster;
private Configuration conf = new Configuration();
@@ -272,12 +273,16 @@ public class TestDFSStripedInputStream {
// |10 |
done += in.read(0, readBuffer, 0, delta);
assertEquals(delta, done);
+ assertArrayEquals(Arrays.copyOf(expected, done),
+ Arrays.copyOf(readBuffer, done));
// both head and trail cells are partial
// |c_0 |c_1 |c_2 |c_3 |c_4 |c_5 |
// |256K - 10|missing|256K|256K|256K - 10|not in range|
done += in.read(delta, readBuffer, delta,
CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta);
assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done);
+ assertArrayEquals(Arrays.copyOf(expected, done),
+ Arrays.copyOf(readBuffer, done));
// read the rest
done += in.read(done, readBuffer, done, readSize - done);
assertEquals(readSize, done);
@@ -291,8 +296,8 @@ public class TestDFSStripedInputStream {
testStatefulRead(true, true);
}
- private void testStatefulRead(boolean useByteBuffer, boolean cellMisalignPacket)
- throws Exception {
+ private void testStatefulRead(boolean useByteBuffer,
+ boolean cellMisalignPacket) throws Exception {
final int numBlocks = 2;
final int fileSize = numBlocks * BLOCK_GROUP_SIZE;
if (cellMisalignPacket) {
@@ -302,7 +307,8 @@ public class TestDFSStripedInputStream {
}
DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
NUM_STRIPE_PER_BLOCK, false);
- LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(filePath.toString(), 0, fileSize);
+ LocatedBlocks lbs = fs.getClient().namenode.
+ getBlockLocations(filePath.toString(), 0, fileSize);
assert lbs.getLocatedBlocks().size() == numBlocks;
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
@@ -360,4 +366,111 @@ public class TestDFSStripedInputStream {
}
fs.delete(filePath, true);
}
+
+ @Test
+ public void testStatefulReadWithDNFailure() throws Exception {
+ final int numBlocks = 4;
+ final int failedDNIdx = DATA_BLK_NUM - 1;
+ DFSTestUtil.createStripedFile(cluster, filePath, null, numBlocks,
+ NUM_STRIPE_PER_BLOCK, false);
+ LocatedBlocks lbs = fs.getClient().namenode.getBlockLocations(
+ filePath.toString(), 0, BLOCK_GROUP_SIZE);
+
+ assert lbs.get(0) instanceof LocatedStripedBlock;
+ LocatedStripedBlock bg = (LocatedStripedBlock) (lbs.get(0));
+ for (int i = 0; i < DATA_BLK_NUM + PARITY_BLK_NUM; i++) {
+ Block blk = new Block(bg.getBlock().getBlockId() + i,
+ NUM_STRIPE_PER_BLOCK * CELLSIZE,
+ bg.getBlock().getGenerationStamp());
+ blk.setGenerationStamp(bg.getBlock().getGenerationStamp());
+ cluster.injectBlocks(i, Arrays.asList(blk),
+ bg.getBlock().getBlockPoolId());
+ }
+ DFSStripedInputStream in =
+ new DFSStripedInputStream(fs.getClient(), filePath.toString(), false,
+ ecPolicy, null);
+ int readSize = BLOCK_GROUP_SIZE;
+ byte[] readBuffer = new byte[readSize];
+ byte[] expected = new byte[readSize];
+ /** A variation of {@link DFSTestUtil#fillExpectedBuf} for striped blocks */
+ for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+ for (int j = 0; j < DATA_BLK_NUM; j++) {
+ for (int k = 0; k < CELLSIZE; k++) {
+ int posInBlk = i * CELLSIZE + k;
+ int posInFile = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE + k;
+ expected[posInFile] = SimulatedFSDataset.simulatedByte(
+ new Block(bg.getBlock().getBlockId() + j), posInBlk);
+ }
+ }
+ }
+
+ ErasureCoderOptions coderOptions = new ErasureCoderOptions(
+ DATA_BLK_NUM, PARITY_BLK_NUM);
+ RawErasureDecoder rawDecoder = CodecUtil.createRawDecoder(conf,
+ ecPolicy.getCodecName(), coderOptions);
+
+ // Update the expected content for decoded data
+ int[] missingBlkIdx = new int[PARITY_BLK_NUM];
+ for (int i = 0; i < missingBlkIdx.length; i++) {
+ if (i == 0) {
+ missingBlkIdx[i] = failedDNIdx;
+ } else {
+ missingBlkIdx[i] = DATA_BLK_NUM + i;
+ }
+ }
+ cluster.stopDataNode(failedDNIdx);
+ for (int i = 0; i < NUM_STRIPE_PER_BLOCK; i++) {
+ byte[][] decodeInputs = new byte[DATA_BLK_NUM + PARITY_BLK_NUM][CELLSIZE];
+ byte[][] decodeOutputs = new byte[missingBlkIdx.length][CELLSIZE];
+ for (int j = 0; j < DATA_BLK_NUM; j++) {
+ int posInBuf = i * CELLSIZE * DATA_BLK_NUM + j * CELLSIZE;
+ if (j != failedDNIdx) {
+ System.arraycopy(expected, posInBuf, decodeInputs[j], 0, CELLSIZE);
+ }
+ }
+ for (int j = DATA_BLK_NUM; j < DATA_BLK_NUM + PARITY_BLK_NUM; j++) {
+ for (int k = 0; k < CELLSIZE; k++) {
+ int posInBlk = i * CELLSIZE + k;
+ decodeInputs[j][k] = SimulatedFSDataset.simulatedByte(
+ new Block(bg.getBlock().getBlockId() + j), posInBlk);
+ }
+ }
+ for (int m : missingBlkIdx) {
+ decodeInputs[m] = null;
+ }
+ rawDecoder.decode(decodeInputs, missingBlkIdx, decodeOutputs);
+ int posInBuf = i * CELLSIZE * DATA_BLK_NUM + failedDNIdx * CELLSIZE;
+ System.arraycopy(decodeOutputs[0], 0, expected, posInBuf, CELLSIZE);
+ }
+
+ int delta = 10;
+ int done = 0;
+ // read a small delta, shouldn't trigger decode
+ // |cell_0 |
+ // |10 |
+ done += in.read(readBuffer, 0, delta);
+ assertEquals(delta, done);
+ // both head and trail cells are partial
+ // |c_0 |c_1 |c_2 |c_3 |c_4 |c_5 |
+ // |256K - 10|missing|256K|256K|256K - 10|not in range|
+ while (done < (CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta)) {
+ int ret = in.read(readBuffer, delta,
+ CELLSIZE * (DATA_BLK_NUM - 1) - 2 * delta);
+ assertTrue(ret > 0);
+ done += ret;
+ }
+ assertEquals(CELLSIZE * (DATA_BLK_NUM - 1) - delta, done);
+ // read the rest
+
+ int restSize;
+ restSize = readSize - done;
+ while (done < restSize) {
+ int ret = in.read(readBuffer, done, restSize);
+ assertTrue(ret > 0);
+ done += ret;
+ }
+
+ assertEquals(readSize, done);
+ assertArrayEquals(expected, readBuffer);
+ }
}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
new file mode 100644
index 00000000000..f470688a184
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/TestInstrumentedLock.java
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdfs;
+
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Lock;
+
+import org.apache.hadoop.util.AutoCloseableLock;
+import org.apache.hadoop.util.Timer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import static org.mockito.Mockito.*;
+import static org.junit.Assert.*;
+
+/**
+ * A test class for InstrumentedLock.
+ */
+public class TestInstrumentedLock {
+
+ static final Log LOG = LogFactory.getLog(TestInstrumentedLock.class);
+
+ @Rule public TestName name = new TestName();
+
+ /**
+ * Test exclusive access of the lock.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testMultipleThread() throws Exception {
+ String testname = name.getMethodName();
+ InstrumentedLock lock = new InstrumentedLock(testname, LOG, 0, 300);
+ lock.lock();
+ try {
+ Thread competingThread = new Thread() {
+ @Override
+ public void run() {
+ assertFalse(lock.tryLock());
+ }
+ };
+ competingThread.start();
+ competingThread.join();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Test the correctness with try-with-resource syntax.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testTryWithResourceSyntax() throws Exception {
+ String testname = name.getMethodName();
+ final AtomicReference lockThread = new AtomicReference<>(null);
+ Lock lock = new InstrumentedLock(testname, LOG, 0, 300) {
+ @Override
+ public void lock() {
+ super.lock();
+ lockThread.set(Thread.currentThread());
+ }
+ @Override
+ public void unlock() {
+ super.unlock();
+ lockThread.set(null);
+ }
+ };
+ AutoCloseableLock acl = new AutoCloseableLock(lock);
+ try (AutoCloseable localLock = acl.acquire()) {
+ assertEquals(acl, localLock);
+ Thread competingThread = new Thread() {
+ @Override
+ public void run() {
+ assertNotEquals(Thread.currentThread(), lockThread.get());
+ assertFalse(lock.tryLock());
+ }
+ };
+ competingThread.start();
+ competingThread.join();
+ assertEquals(Thread.currentThread(), lockThread.get());
+ }
+ assertNull(lockThread.get());
+ }
+
+ /**
+ * Test the lock logs warning when lock held time is greater than threshold
+ * and not log warning otherwise.
+ * @throws Exception
+ */
+ @Test(timeout=10000)
+ public void testLockLongHoldingReport() throws Exception {
+ String testname = name.getMethodName();
+ final AtomicLong time = new AtomicLong(0);
+ Timer mclock = new Timer() {
+ @Override
+ public long monotonicNow() {
+ return time.get();
+ }
+ };
+ Lock mlock = mock(Lock.class);
+
+ final AtomicLong wlogged = new AtomicLong(0);
+ final AtomicLong wsuppresed = new AtomicLong(0);
+ InstrumentedLock lock = new InstrumentedLock(
+ testname, LOG, mlock, 2000, 300, mclock) {
+ @Override
+ void logWarning(long lockHeldTime, long suppressed) {
+ wlogged.incrementAndGet();
+ wsuppresed.set(suppressed);
+ }
+ };
+
+ // do not log warning when the lock held time is short
+ lock.lock(); // t = 0
+ time.set(200);
+ lock.unlock(); // t = 200
+ assertEquals(0, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ lock.lock(); // t = 200
+ time.set(700);
+ lock.unlock(); // t = 700
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ // despite the lock held time is greater than threshold
+ // suppress the log warning due to the logging gap
+ // (not recorded in wsuppressed until next log message)
+ lock.lock(); // t = 700
+ time.set(1100);
+ lock.unlock(); // t = 1100
+ assertEquals(1, wlogged.get());
+ assertEquals(0, wsuppresed.get());
+
+ // log a warning message when the lock held time is greater the threshold
+ // and the logging time gap is satisfied. Also should display suppressed
+ // previous warnings.
+ time.set(2400);
+ lock.lock(); // t = 2400
+ time.set(2800);
+ lock.unlock(); // t = 2800
+ assertEquals(2, wlogged.get());
+ assertEquals(1, wsuppresed.get());
+ }
+
+}
diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java
new file mode 100644
index 00000000000..0b7ee337d8b
--- /dev/null
+++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/security/TestClientProtocolWithDelegationToken.java
@@ -0,0 +1,119 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdfs.security;
+
+import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION;
+import static org.mockito.Mockito.mock;
+
+import java.net.InetSocketAddress;
+import java.security.PrivilegedExceptionAction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.DFSConfigKeys;
+import org.apache.hadoop.hdfs.protocol.ClientProtocol;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
+import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenSecretManager;
+import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.ipc.Client;
+import org.apache.hadoop.ipc.RPC;
+import org.apache.hadoop.ipc.Server;
+import org.apache.hadoop.net.NetUtils;
+import org.apache.hadoop.security.SaslInputStream;
+import org.apache.hadoop.security.SaslRpcClient;
+import org.apache.hadoop.security.SaslRpcServer;
+import org.apache.hadoop.security.SecurityUtil;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.Token;
+import org.apache.hadoop.test.GenericTestUtils;
+import org.apache.log4j.Level;
+import org.junit.Test;
+
+/** Unit tests for using Delegation Token over RPC. */
+public class TestClientProtocolWithDelegationToken {
+ private static final String ADDRESS = "0.0.0.0";
+
+ public static final Log LOG = LogFactory
+ .getLog(TestClientProtocolWithDelegationToken.class);
+
+ private static final Configuration conf;
+ static {
+ conf = new Configuration();
+ conf.set(HADOOP_SECURITY_AUTHENTICATION, "kerberos");
+ UserGroupInformation.setConfiguration(conf);
+ }
+
+ static {
+ GenericTestUtils.setLogLevel(Client.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(Server.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(SaslRpcClient.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(SaslRpcServer.LOG, Level.ALL);
+ GenericTestUtils.setLogLevel(SaslInputStream.LOG, Level.ALL);
+ }
+
+ @Test
+ public void testDelegationTokenRpc() throws Exception {
+ ClientProtocol mockNN = mock(ClientProtocol.class);
+ FSNamesystem mockNameSys = mock(FSNamesystem.class);
+
+ DelegationTokenSecretManager sm = new DelegationTokenSecretManager(
+ DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
+ DFSConfigKeys.DFS_NAMENODE_DELEGATION_KEY_UPDATE_INTERVAL_DEFAULT,
+ DFSConfigKeys.DFS_NAMENODE_DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT,
+ 3600000, mockNameSys);
+ sm.startThreads();
+ final Server server = new RPC.Builder(conf)
+ .setProtocol(ClientProtocol.class).setInstance(mockNN)
+ .setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
+ .setSecretManager(sm).build();
+
+ server.start();
+
+ final UserGroupInformation current = UserGroupInformation.getCurrentUser();
+ final InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ String user = current.getUserName();
+ Text owner = new Text(user);
+ DelegationTokenIdentifier dtId = new DelegationTokenIdentifier(owner, owner, null);
+ Token token = new Token(
+ dtId, sm);
+ SecurityUtil.setTokenService(token, addr);
+ LOG.info("Service for token is " + token.getService());
+ current.addToken(token);
+ current.doAs(new PrivilegedExceptionAction