HDFS-10518. DiskBalancer: Pretty-print json in Query command. Contributed by Anu Engineer.

This commit is contained in:
Anu Engineer 2016-06-13 14:11:23 -07:00 committed by Arpit Agarwal
parent af11ab34d0
commit 7e2be5c4a0
5 changed files with 116 additions and 6 deletions

View File

@ -34,6 +34,8 @@
@InterfaceStability.Unstable @InterfaceStability.Unstable
@JsonInclude(JsonInclude.Include.NON_DEFAULT) @JsonInclude(JsonInclude.Include.NON_DEFAULT)
public class DiskBalancerWorkItem { public class DiskBalancerWorkItem {
private long startTime;
private long secondsElapsed;
private long bytesToCopy; private long bytesToCopy;
private long bytesCopied; private long bytesCopied;
private long errorCount; private long errorCount;
@ -242,4 +244,44 @@ public long getBandwidth() {
public void setBandwidth(long bandwidth) { public void setBandwidth(long bandwidth) {
this.bandwidth = bandwidth; this.bandwidth = bandwidth;
} }
/**
* Records the Start time of execution.
* @return startTime
*/
public long getStartTime() {
return startTime;
}
/**
* Sets the Start time.
* @param startTime - Time stamp for start of execution.
*/
public void setStartTime(long startTime) {
this.startTime = startTime;
}
/**
* Gets the number of seconds elapsed from the start time.
*
* The reason why we have this is of time skews. The client's current time
* may not match with the server time stamp, hence the elapsed second
* cannot be computed from only startTime.
*
* @return seconds elapsed from start time.
*/
public long getSecondsElapsed() {
return secondsElapsed;
}
/**
* Sets number of seconds elapsed.
*
* This is updated whenever we update the other counters.
* @param secondsElapsed - seconds elapsed.
*/
public void setSecondsElapsed(long secondsElapsed) {
this.secondsElapsed = secondsElapsed;
}
} }

View File

@ -24,6 +24,7 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializationConfig;
import static org.codehaus.jackson.map.type.TypeFactory.defaultInstance; import static org.codehaus.jackson.map.type.TypeFactory.defaultInstance;
@ -128,6 +129,7 @@ public List<DiskBalancerWorkEntry> getCurrentState() {
**/ **/
public String currentStateString() throws IOException { public String currentStateString() throws IOException {
ObjectMapper mapper = new ObjectMapper(); ObjectMapper mapper = new ObjectMapper();
mapper.enable(SerializationConfig.Feature.INDENT_OUTPUT);
return mapper.writeValueAsString(currentState); return mapper.writeValueAsString(currentState);
} }

View File

@ -552,6 +552,21 @@ public interface BlockMover {
* @return FsDatasetSpi * @return FsDatasetSpi
*/ */
FsDatasetSpi getDataset(); FsDatasetSpi getDataset();
/**
* Returns time when this plan started executing.
*
* @return Start time in milliseconds.
*/
long getStartTime();
/**
* Number of seconds elapsed.
*
* @return time in seconds
*/
long getElapsedSeconds();
} }
/** /**
@ -622,6 +637,8 @@ public static class DiskBalancerMover implements BlockMover {
private long maxDiskErrors; private long maxDiskErrors;
private int poolIndex; private int poolIndex;
private AtomicBoolean shouldRun; private AtomicBoolean shouldRun;
private long startTime;
private long secondsElapsed;
/** /**
* Constructs diskBalancerMover. * Constructs diskBalancerMover.
@ -897,6 +914,9 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
FsVolumeSpi source = pair.getSource(); FsVolumeSpi source = pair.getSource();
FsVolumeSpi dest = pair.getDest(); FsVolumeSpi dest = pair.getDest();
List<FsVolumeSpi.BlockIterator> poolIters = new LinkedList<>(); List<FsVolumeSpi.BlockIterator> poolIters = new LinkedList<>();
startTime = Time.now();
item.setStartTime(startTime);
secondsElapsed = 0;
if (source.isTransientStorage() || dest.isTransientStorage()) { if (source.isTransientStorage() || dest.isTransientStorage()) {
return; return;
@ -937,7 +957,7 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
if (block == null) { if (block == null) {
this.setExitFlag(); this.setExitFlag();
LOG.error("No source blocks, exiting the copy. Source: {}, " + LOG.error("No source blocks, exiting the copy. Source: {}, " +
"dest:{}", source.getBasePath(), dest.getBasePath()); "dest:{}", source.getBasePath(), dest.getBasePath());
continue; continue;
} }
@ -973,9 +993,6 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
block.getNumBytes(), source.getBasePath(), block.getNumBytes(), source.getBasePath(),
dest.getBasePath()); dest.getBasePath());
item.incCopiedSoFar(block.getNumBytes());
item.incBlocksCopied();
// Check for the max throughput constraint. // Check for the max throughput constraint.
// We sleep here to keep the promise that we will not // We sleep here to keep the promise that we will not
// copy more than Max MB/sec. we sleep enough time // copy more than Max MB/sec. we sleep enough time
@ -984,6 +1001,14 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
// we exit via Thread Interrupted exception. // we exit via Thread Interrupted exception.
Thread.sleep(computeDelay(block.getNumBytes(), timeUsed, item)); Thread.sleep(computeDelay(block.getNumBytes(), timeUsed, item));
// We delay updating the info to avoid confusing the user.
// This way we report the copy only if it is under the
// throughput threshold.
item.incCopiedSoFar(block.getNumBytes());
item.incBlocksCopied();
secondsElapsed = TimeUnit.MILLISECONDS.toSeconds(Time.now() -
startTime);
item.setSecondsElapsed(secondsElapsed);
} catch (IOException ex) { } catch (IOException ex) {
LOG.error("Exception while trying to copy blocks. error: {}", ex); LOG.error("Exception while trying to copy blocks. error: {}", ex);
item.incErrorCount(); item.incErrorCount();
@ -1009,5 +1034,25 @@ public void copyBlocks(VolumePair pair, DiskBalancerWorkItem item) {
public FsDatasetSpi getDataset() { public FsDatasetSpi getDataset() {
return dataset; return dataset;
} }
/**
* Returns time when this plan started executing.
*
* @return Start time in milliseconds.
*/
@Override
public long getStartTime() {
return startTime;
}
/**
* Number of seconds elapsed.
*
* @return time in seconds
*/
@Override
public long getElapsedSeconds() {
return secondsElapsed;
}
} }
} }

View File

@ -46,6 +46,7 @@ public QueryCommand(Configuration conf) {
" plan running on a given datanode."); " plan running on a given datanode.");
addValidCommandParameters(DiskBalancer.VERBOSE, "Prints verbose results."); addValidCommandParameters(DiskBalancer.VERBOSE, "Prints verbose results.");
} }
/** /**
* Executes the Client Calls. * Executes the Client Calls.
* *
@ -62,7 +63,7 @@ public void execute(CommandLine cmd) throws Exception {
String nodeAddress = nodeName; String nodeAddress = nodeName;
// if the string is not name:port format use the default port. // if the string is not name:port format use the default port.
if(!nodeName.matches("^.*:\\d$")) { if (!nodeName.matches("^.*:\\d$")) {
int defaultIPC = NetUtils.createSocketAddr( int defaultIPC = NetUtils.createSocketAddr(
getConf().getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY, getConf().getTrimmed(DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_KEY,
DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort(); DFSConfigKeys.DFS_DATANODE_IPC_ADDRESS_DEFAULT)).getPort();
@ -76,7 +77,7 @@ public void execute(CommandLine cmd) throws Exception {
System.out.printf("Plan ID: %s %nResult: %s%n", workStatus.getPlanID(), System.out.printf("Plan ID: %s %nResult: %s%n", workStatus.getPlanID(),
workStatus.getResult().toString()); workStatus.getResult().toString());
if(cmd.hasOption(DiskBalancer.VERBOSE)) { if (cmd.hasOption(DiskBalancer.VERBOSE)) {
System.out.printf("%s", workStatus.currentStateString()); System.out.printf("%s", workStatus.currentStateString());
} }
} catch (DiskBalancerException ex) { } catch (DiskBalancerException ex) {

View File

@ -436,6 +436,26 @@ public FsDatasetSpi getDataset() {
return this.dataset; return this.dataset;
} }
/**
* Returns time when this plan started executing.
*
* @return Start time in milliseconds.
*/
@Override
public long getStartTime() {
return 0;
}
/**
* Number of seconds elapsed.
*
* @return time in seconds
*/
@Override
public long getElapsedSeconds() {
return 0;
}
public int getRunCount() { public int getRunCount() {
synchronized (runCount) { synchronized (runCount) {
LOG.info("Run count : " + runCount.intValue()); LOG.info("Run count : " + runCount.intValue());