HDFS-6841. Use Time.monotonicNow() wherever applicable instead of Time.now(). Contributed by Vinayakumar B

(cherry picked from commit 99a8dcd19528b265d4fda9ae09a17e4af52f2782)

Conflicts:
	hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/DatanodeInfo.java
	hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java
This commit is contained in:
Kihwal Lee 2015-03-20 13:52:09 -05:00
parent 6be52e42a9
commit 29642b33cb
48 changed files with 303 additions and 236 deletions

View File

@ -926,6 +926,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7957. Truncate should verify quota before making changes. (jing9)
HDFS-6841. Use Time.monotonicNow() wherever applicable instead of Time.now()
(Vinayakumar B via kihwal)
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode

View File

@ -887,7 +887,7 @@ void updateLastLeaseRenewal() {
if (filesBeingWritten.isEmpty()) {
return;
}
lastLeaseRenewal = Time.now();
lastLeaseRenewal = Time.monotonicNow();
}
}
@ -904,7 +904,7 @@ boolean renewLease() throws IOException {
return true;
} catch (IOException e) {
// Abort if the lease has already expired.
final long elapsed = Time.now() - getLastLeaseRenewal();
final long elapsed = Time.monotonicNow() - getLastLeaseRenewal();
if (elapsed > HdfsConstants.LEASE_HARDLIMIT_PERIOD) {
LOG.warn("Failed to renew lease for " + clientName + " for "
+ (elapsed/1000) + " seconds (>= hard-limit ="
@ -1022,7 +1022,7 @@ public long getBlockSize(String f) throws IOException {
* @see ClientProtocol#getServerDefaults()
*/
public FsServerDefaults getServerDefaults() throws IOException {
long now = Time.now();
long now = Time.monotonicNow();
if (now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD) {
serverDefaults = namenode.getServerDefaults();
serverDefaultsLastUpdate = now;

View File

@ -381,7 +381,7 @@ private void endBlock() {
*/
@Override
public void run() {
long lastPacket = Time.now();
long lastPacket = Time.monotonicNow();
TraceScope scope = NullScope.INSTANCE;
while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
@ -405,7 +405,7 @@ public void run() {
synchronized (dataQueue) {
// wait for a packet to be sent.
long now = Time.now();
long now = Time.monotonicNow();
while ((!streamerClosed && !hasError && dfsClient.clientRunning
&& dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
@ -421,7 +421,7 @@ public void run() {
DFSClient.LOG.warn("Caught exception ", e);
}
doSleep = false;
now = Time.now();
now = Time.monotonicNow();
}
if (streamerClosed || hasError || !dfsClient.clientRunning) {
continue;
@ -520,7 +520,7 @@ public void run() {
} finally {
writeScope.close();
}
lastPacket = Time.now();
lastPacket = Time.monotonicNow();
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
@ -759,8 +759,8 @@ public void run() {
// the local node or the only one in the pipeline.
if (PipelineAck.isRestartOOBStatus(reply) &&
shouldWaitForRestart(i)) {
restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
Time.now();
restartDeadline = dfsClient.getConf().datanodeRestartTimeout
+ Time.monotonicNow();
setRestartingNodeIndex(i);
String message = "A datanode is restarting: " + targets[i];
DFSClient.LOG.info(message);
@ -1174,7 +1174,7 @@ private boolean setupPipelineForAppendOrRecovery() throws IOException {
errorIndex = -1;
}
// still within the deadline
if (Time.now() < restartDeadline) {
if (Time.monotonicNow() < restartDeadline) {
continue; // with in the deadline
}
// expired. declare the restarting node dead
@ -1225,14 +1225,12 @@ private LocatedBlock nextBlockOutputStream() throws IOException {
errorIndex = -1;
success = false;
long startTime = Time.now();
DatanodeInfo[] excluded =
excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
.keySet()
.toArray(new DatanodeInfo[0]);
block = oldBlock;
lb = locateFollowingBlock(startTime,
excluded.length > 0 ? excluded : null);
lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
block = lb.getBlock();
block.setNumBytes(0);
bytesSent = 0;
@ -1379,7 +1377,7 @@ private boolean createBlockOutputStream(DatanodeInfo[] nodes,
// Check whether there is a restart worth waiting for.
if (checkRestart && shouldWaitForRestart(errorIndex)) {
restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
Time.now();
Time.monotonicNow();
restartingNodeIndex.set(errorIndex);
errorIndex = -1;
DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
@ -1429,13 +1427,12 @@ private boolean[] getPinnings(DatanodeInfo[] nodes, boolean shouldLog) {
}
}
private LocatedBlock locateFollowingBlock(long start,
DatanodeInfo[] excludedNodes) throws IOException {
private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException {
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
long sleeptime = dfsClient.getConf().
blockWriteLocateFollowingInitialDelayMs;
while (true) {
long localstart = Time.now();
long localstart = Time.monotonicNow();
while (true) {
try {
return dfsClient.namenode.addBlock(src, dfsClient.clientName,
@ -1459,10 +1456,10 @@ private LocatedBlock locateFollowingBlock(long start,
} else {
--retries;
DFSClient.LOG.info("Exception while adding a block", e);
if (Time.now() - localstart > 5000) {
long elapsed = Time.monotonicNow() - localstart;
if (elapsed > 5000) {
DFSClient.LOG.info("Waiting for replication for "
+ (Time.now() - localstart) / 1000
+ " seconds");
+ (elapsed / 1000) + " seconds");
}
try {
DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src
@ -2257,7 +2254,7 @@ private synchronized void closeImpl() throws IOException {
// should be called holding (this) lock since setTestFilename() may
// be called during unit tests
private void completeFile(ExtendedBlock last) throws IOException {
long localstart = Time.now();
long localstart = Time.monotonicNow();
long sleeptime = dfsClient.getConf().
blockWriteLocateFollowingInitialDelayMs;
boolean fileComplete = false;
@ -2267,8 +2264,9 @@ private void completeFile(ExtendedBlock last) throws IOException {
dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
if (!fileComplete) {
final int hdfsTimeout = dfsClient.getHdfsTimeout();
if (!dfsClient.clientRunning ||
(hdfsTimeout > 0 && localstart + hdfsTimeout < Time.now())) {
if (!dfsClient.clientRunning
|| (hdfsTimeout > 0
&& localstart + hdfsTimeout < Time.monotonicNow())) {
String msg = "Unable to close file because dfsclient " +
" was unable to contact the HDFS servers." +
" clientRunning " + dfsClient.clientRunning +
@ -2284,7 +2282,7 @@ private void completeFile(ExtendedBlock last) throws IOException {
retries--;
Thread.sleep(sleeptime);
sleeptime *= 2;
if (Time.now() - localstart > 5000) {
if (Time.monotonicNow() - localstart > 5000) {
DFSClient.LOG.info("Could not complete " + src + " retrying...");
}
} catch (InterruptedException ie) {

View File

@ -278,7 +278,7 @@ synchronized String getDaemonName() {
/** Is the empty period longer than the grace period? */
private synchronized boolean isRenewerExpired() {
return emptyTime != Long.MAX_VALUE
&& Time.now() - emptyTime > gracePeriod;
&& Time.monotonicNow() - emptyTime > gracePeriod;
}
synchronized void put(final long inodeId, final DFSOutputStream out,
@ -346,7 +346,7 @@ void closeFile(final long inodeId, final DFSClient dfsc) {
}
}
//discover the first time that all file-being-written maps are empty.
emptyTime = Time.now();
emptyTime = Time.monotonicNow();
}
}
}
@ -361,7 +361,7 @@ synchronized void closeClient(final DFSClient dfsc) {
}
if (emptyTime == Long.MAX_VALUE) {
//discover the first time that the client list is empty.
emptyTime = Time.now();
emptyTime = Time.monotonicNow();
}
}
@ -434,9 +434,9 @@ public int compare(final DFSClient left, final DFSClient right) {
* when the lease period is half over.
*/
private void run(final int id) throws InterruptedException {
for(long lastRenewed = Time.now(); !Thread.interrupted();
for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted();
Thread.sleep(getSleepPeriod())) {
final long elapsed = Time.now() - lastRenewed;
final long elapsed = Time.monotonicNow() - lastRenewed;
if (elapsed >= getRenewalTime()) {
try {
renew();
@ -444,7 +444,7 @@ private void run(final int id) throws InterruptedException {
LOG.debug("Lease renewer daemon for " + clientsString()
+ " with renew id " + id + " executed");
}
lastRenewed = Time.now();
lastRenewed = Time.monotonicNow();
} catch (SocketTimeoutException ie) {
LOG.warn("Failed to renew lease for " + clientsString() + " for "
+ (elapsed/1000) + " seconds. Aborting ...", ie);
@ -479,7 +479,7 @@ private void run(final int id) throws InterruptedException {
// registered with this renewer, stop the daemon after the grace
// period.
if (!clientsRunning() && emptyTime == Long.MAX_VALUE) {
emptyTime = Time.now();
emptyTime = Time.monotonicNow();
}
}
}

View File

@ -49,6 +49,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
private long cacheCapacity;
private long cacheUsed;
private long lastUpdate;
private long lastUpdateMonotonic;
private int xceiverCount;
private String location = NetworkTopology.DEFAULT_RACK;
private String softwareVersion;
@ -91,6 +92,7 @@ public DatanodeInfo(DatanodeInfo from) {
this.cacheCapacity = from.getCacheCapacity();
this.cacheUsed = from.getCacheUsed();
this.lastUpdate = from.getLastUpdate();
this.lastUpdateMonotonic = from.getLastUpdateMonotonic();
this.xceiverCount = from.getXceiverCount();
this.location = from.getNetworkLocation();
this.adminState = from.getAdminState();
@ -105,6 +107,7 @@ public DatanodeInfo(DatanodeID nodeID) {
this.cacheCapacity = 0L;
this.cacheUsed = 0L;
this.lastUpdate = 0L;
this.lastUpdateMonotonic = 0L;
this.xceiverCount = 0;
this.adminState = null;
}
@ -117,12 +120,13 @@ public DatanodeInfo(DatanodeID nodeID, String location) {
public DatanodeInfo(DatanodeID nodeID, String location,
final long capacity, final long dfsUsed, final long remaining,
final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
final long lastUpdate, final int xceiverCount,
final AdminStates adminState) {
final long lastUpdate, final long lastUpdateMonotonic,
final int xceiverCount, final AdminStates adminState) {
this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getDatanodeUuid(),
nodeID.getXferPort(), nodeID.getInfoPort(), nodeID.getInfoSecurePort(),
nodeID.getIpcPort(), capacity, dfsUsed, remaining, blockPoolUsed,
cacheCapacity, cacheUsed, lastUpdate, xceiverCount, location, adminState);
cacheCapacity, cacheUsed, lastUpdate, lastUpdateMonotonic,
xceiverCount, location, adminState);
}
/** Constructor */
@ -131,8 +135,9 @@ public DatanodeInfo(final String ipAddr, final String hostName,
final int infoSecurePort, final int ipcPort,
final long capacity, final long dfsUsed, final long remaining,
final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
final long lastUpdate, final int xceiverCount,
final String networkLocation, final AdminStates adminState) {
final long lastUpdate, final long lastUpdateMonotonic,
final int xceiverCount, final String networkLocation,
final AdminStates adminState) {
super(ipAddr, hostName, datanodeUuid, xferPort, infoPort,
infoSecurePort, ipcPort);
this.capacity = capacity;
@ -142,6 +147,7 @@ public DatanodeInfo(final String ipAddr, final String hostName,
this.cacheCapacity = cacheCapacity;
this.cacheUsed = cacheUsed;
this.lastUpdate = lastUpdate;
this.lastUpdateMonotonic = lastUpdateMonotonic;
this.xceiverCount = xceiverCount;
this.location = networkLocation;
this.adminState = adminState;
@ -222,9 +228,26 @@ public float getCacheRemainingPercent() {
return DFSUtil.getPercentRemaining(getCacheRemaining(), cacheCapacity);
}
/** The time when this information was accurate. */
/**
* Get the last update timestamp.
* Return value is suitable for Date conversion.
*/
public long getLastUpdate() { return lastUpdate; }
/**
* The time when this information was accurate. <br>
* Ps: So return value is ideal for calculation of time differences.
* Should not be used to convert to Date.
*/
public long getLastUpdateMonotonic() { return lastUpdateMonotonic;}
/**
* Set lastUpdate monotonic time
*/
public void setLastUpdateMonotonic(long lastUpdateMonotonic) {
this.lastUpdateMonotonic = lastUpdateMonotonic;
}
/** number of active connections */
public int getXceiverCount() { return xceiverCount; }
@ -434,7 +457,7 @@ public AdminStates getAdminState() {
* @return true if the node is stale
*/
public boolean isStale(long staleInterval) {
return (Time.now() - lastUpdate) >= staleInterval;
return (Time.monotonicNow() - lastUpdateMonotonic) >= staleInterval;
}
/**

View File

@ -643,8 +643,8 @@ static public DatanodeInfo convert(DatanodeInfoProto di) {
di.hasLocation() ? di.getLocation() : null ,
di.getCapacity(), di.getDfsUsed(), di.getRemaining(),
di.getBlockPoolUsed(), di.getCacheCapacity(), di.getCacheUsed(),
di.getLastUpdate(), di.getXceiverCount(),
PBHelper.convert(di.getAdminState()));
di.getLastUpdate(), di.getLastUpdateMonotonic(),
di.getXceiverCount(), PBHelper.convert(di.getAdminState()));
}
static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
@ -705,6 +705,7 @@ public static DatanodeInfoProto convert(DatanodeInfo info) {
.setCacheCapacity(info.getCacheCapacity())
.setCacheUsed(info.getCacheUsed())
.setLastUpdate(info.getLastUpdate())
.setLastUpdateMonotonic(info.getLastUpdateMonotonic())
.setXceiverCount(info.getXceiverCount())
.setAdminState(PBHelper.convert(info.getAdminState()))
.build();

View File

@ -673,7 +673,7 @@ static class Cli extends Configured implements Tool {
*/
@Override
public int run(String[] args) {
final long startTime = Time.now();
final long startTime = Time.monotonicNow();
final Configuration conf = getConf();
try {
@ -688,8 +688,10 @@ public int run(String[] args) {
System.out.println(e + ". Exiting ...");
return ExitStatus.INTERRUPTED.getExitCode();
} finally {
System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date()));
System.out.println("Balancing took " + time2Str(Time.now()-startTime));
System.out.format("%-24s ",
DateFormat.getDateTimeInstance().format(new Date()));
System.out.println("Balancing took "
+ time2Str(Time.monotonicNow() - startTime));
}
}

View File

@ -315,7 +315,8 @@ public void initializeBlockRecovery(long recoveryId) {
continue;
}
final ReplicaUnderConstruction ruc = replicas.get(i);
final long lastUpdate = ruc.getExpectedStorageLocation().getDatanodeDescriptor().getLastUpdate();
final long lastUpdate = ruc.getExpectedStorageLocation()
.getDatanodeDescriptor().getLastUpdateMonotonic();
if (lastUpdate > mostRecentLastUpdate) {
primaryNodeIndex = i;
primary = ruc;

View File

@ -1775,7 +1775,7 @@ public boolean processReport(final DatanodeID nodeID,
final DatanodeStorage storage,
final BlockListAsLongs newReport) throws IOException {
namesystem.writeLock();
final long startTime = Time.now(); //after acquiring write lock
final long startTime = Time.monotonicNow(); //after acquiring write lock
final long endTime;
DatanodeDescriptor node;
Collection<Block> invalidatedBlocks = null;
@ -1813,7 +1813,7 @@ public boolean processReport(final DatanodeID nodeID,
storageInfo.receivedBlockReport();
} finally {
endTime = Time.now();
endTime = Time.monotonicNow();
namesystem.writeUnlock();
}
@ -1843,7 +1843,7 @@ void rescanPostponedMisreplicatedBlocks() {
if (getPostponedMisreplicatedBlocksCount() == 0) {
return;
}
long startTimeRescanPostponedMisReplicatedBlocks = Time.now();
long startTimeRescanPostponedMisReplicatedBlocks = Time.monotonicNow();
long startPostponedMisReplicatedBlocksCount =
getPostponedMisreplicatedBlocksCount();
namesystem.writeLock();
@ -1903,7 +1903,7 @@ void rescanPostponedMisreplicatedBlocks() {
long endPostponedMisReplicatedBlocksCount =
getPostponedMisreplicatedBlocksCount();
LOG.info("Rescan of postponedMisreplicatedBlocks completed in " +
(Time.now() - startTimeRescanPostponedMisReplicatedBlocks) +
(Time.monotonicNow() - startTimeRescanPostponedMisReplicatedBlocks) +
" msecs. " + endPostponedMisReplicatedBlocksCount +
" blocks are left. " + (startPostponedMisReplicatedBlocksCount -
endPostponedMisReplicatedBlocksCount) + " blocks are removed.");
@ -2651,7 +2651,7 @@ private void stopReplicationInitializer() {
private void processMisReplicatesAsync() throws InterruptedException {
long nrInvalid = 0, nrOverReplicated = 0;
long nrUnderReplicated = 0, nrPostponed = 0, nrUnderConstruction = 0;
long startTimeMisReplicatedScan = Time.now();
long startTimeMisReplicatedScan = Time.monotonicNow();
Iterator<BlockInfoContiguous> blocksItr = blocksMap.getBlocks().iterator();
long totalBlocks = blocksMap.size();
replicationQueuesInitProgress = 0;
@ -2709,7 +2709,8 @@ private void processMisReplicatesAsync() throws InterruptedException {
NameNode.stateChangeLog
.info("STATE* Replication Queue initialization "
+ "scan for invalid, over- and under-replicated blocks "
+ "completed in " + (Time.now() - startTimeMisReplicatedScan)
+ "completed in "
+ (Time.monotonicNow() - startTimeMisReplicatedScan)
+ " msec");
break;
}

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.util.*;
@ -884,7 +884,7 @@ public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc,
Collection<DatanodeStorageInfo> second,
final List<StorageType> excessTypes) {
long oldestHeartbeat =
now() - heartbeatInterval * tolerateHeartbeatMultiplier;
monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier;
DatanodeStorageInfo oldestHeartbeatStorage = null;
long minSpace = Long.MAX_VALUE;
DatanodeStorageInfo minSpaceStorage = null;
@ -898,8 +898,8 @@ public DatanodeStorageInfo chooseReplicaToDelete(BlockCollection bc,
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
long free = node.getRemaining();
long lastHeartbeat = node.getLastUpdate();
if(lastHeartbeat < oldestHeartbeat) {
long lastHeartbeat = node.getLastUpdateMonotonic();
if (lastHeartbeat < oldestHeartbeat) {
oldestHeartbeat = lastHeartbeat;
oldestHeartbeatStorage = storage;
}

View File

@ -407,7 +407,8 @@ public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
setCacheCapacity(cacheCapacity);
setCacheUsed(cacheUsed);
setXceiverCount(xceiverCount);
setLastUpdate(Time.now());
setLastUpdate(Time.now());
setLastUpdateMonotonic(Time.monotonicNow());
this.volumeFailures = volFailures;
this.volumeFailureSummary = volumeFailureSummary;
for (StorageReport report : reports) {
@ -422,7 +423,7 @@ public void updateHeartbeatState(StorageReport[] reports, long cacheCapacity,
totalBlockPoolUsed += report.getBlockPoolUsed();
totalDfsUsed += report.getDfsUsed();
}
rollBlocksScheduled(getLastUpdate());
rollBlocksScheduled(getLastUpdateMonotonic());
// Update total metrics for the node.
setCapacity(totalCapacity);

View File

@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.util.Time.monotonicNow;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.net.InetAddresses;
@ -43,7 +45,6 @@
import org.apache.hadoop.net.*;
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Time;
import java.io.IOException;
import java.io.PrintWriter;
@ -581,8 +582,8 @@ void removeDeadDatanode(final DatanodeID nodeID) {
/** Is the datanode dead? */
boolean isDatanodeDead(DatanodeDescriptor node) {
return (node.getLastUpdate() <
(Time.now() - heartbeatExpireInterval));
return (node.getLastUpdateMonotonic() <
(monotonicNow() - heartbeatExpireInterval));
}
/** Add a datanode. */
@ -1298,7 +1299,7 @@ public List<DatanodeDescriptor> getDatanodeListForReport(
.getAddress().getHostAddress(), addr.getHostName(), "",
addr.getPort() == 0 ? defaultXferPort : addr.getPort(),
defaultInfoPort, defaultInfoSecurePort, defaultIpcPort));
dn.setLastUpdate(0); // Consider this node dead for reporting
setDatanodeDead(dn);
nodes.add(dn);
}
}
@ -1331,6 +1332,7 @@ private static boolean isNameResolved(InetAddress address) {
private void setDatanodeDead(DatanodeDescriptor node) {
node.setLastUpdate(0);
node.setLastUpdateMonotonic(0);
}
/** Handle heartbeat from datanodes. */
@ -1436,7 +1438,7 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
blockPoolId, blks));
}
boolean sendingCachingCommands = false;
long nowMs = Time.monotonicNow();
long nowMs = monotonicNow();
if (shouldSendCachingCommands &&
((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
timeBetweenResendingCachingDirectivesMs)) {

View File

@ -45,7 +45,7 @@
import org.slf4j.LoggerFactory;
import static com.google.common.base.Preconditions.checkArgument;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
/**
* Manages datanode decommissioning. A background monitor thread
@ -208,7 +208,7 @@ public void startDecommission(DatanodeDescriptor node) {
}
// Update DN stats maintained by HeartbeatManager
hbManager.startDecommission(node);
node.decommissioningStatus.setStartTime(now());
node.decommissioningStatus.setStartTime(monotonicNow());
pendingNodes.add(node);
}
} else {

View File

@ -353,7 +353,7 @@ private class Monitor implements Runnable {
public void run() {
while(namesystem.isRunning()) {
try {
final long now = Time.now();
final long now = Time.monotonicNow();
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
heartbeatCheck();
lastHeartbeatCheck = now;

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.PrintWriter;
import java.sql.Time;
@ -177,7 +177,7 @@ static class PendingBlockInfo {
private final List<DatanodeDescriptor> targets;
PendingBlockInfo(DatanodeDescriptor[] targets) {
this.timeStamp = now();
this.timeStamp = monotonicNow();
this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()
: new ArrayList<DatanodeDescriptor>(Arrays.asList(targets));
}
@ -187,7 +187,7 @@ long getTimeStamp() {
}
void setTimeStamp() {
timeStamp = now();
timeStamp = monotonicNow();
}
void incrementReplicas(DatanodeDescriptor... newTargets) {
@ -234,7 +234,7 @@ void pendingReplicationCheck() {
synchronized (pendingReplications) {
Iterator<Map.Entry<Block, PendingBlockInfo>> iter =
pendingReplications.entrySet().iterator();
long now = now();
long now = monotonicNow();
if(LOG.isDebugEnabled()) {
LOG.debug("PendingReplicationMonitor checking Q");
}

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.EOFException;
import java.io.IOException;
@ -57,7 +57,6 @@
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.hadoop.util.VersionUtil;
@ -249,7 +248,7 @@ void scheduleHeartbeat() {
*/
void scheduleBlockReport(long delay) {
if (delay > 0) { // send BR after random delay
lastBlockReport = Time.now()
lastBlockReport = monotonicNow()
- ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
} else { // send at next heartbeat
lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
@ -291,14 +290,14 @@ private void reportReceivedDeletedBlocks() throws IOException {
// Send incremental block reports to the Namenode outside the lock
boolean success = false;
final long startTime = Time.monotonicNow();
final long startTime = monotonicNow();
try {
bpNamenode.blockReceivedAndDeleted(bpRegistration,
bpos.getBlockPoolId(),
reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
success = true;
} finally {
dn.getMetrics().addIncrementalBlockReport(Time.monotonicNow()-startTime);
dn.getMetrics().addIncrementalBlockReport(monotonicNow() - startTime);
if (!success) {
synchronized (pendingIncrementalBRperStorage) {
for (StorageReceivedDeletedBlocks report : reports) {
@ -442,7 +441,7 @@ boolean hasPendingIBR() {
*/
List<DatanodeCommand> blockReport() throws IOException {
// send block report if timer has expired.
final long startTime = now();
final long startTime = monotonicNow();
if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
return null;
}
@ -456,7 +455,7 @@ List<DatanodeCommand> blockReport() throws IOException {
reportReceivedDeletedBlocks();
lastDeletedReport = startTime;
long brCreateStartTime = now();
long brCreateStartTime = monotonicNow();
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
@ -476,7 +475,7 @@ List<DatanodeCommand> blockReport() throws IOException {
int numReportsSent = 0;
int numRPCs = 0;
boolean success = false;
long brSendStartTime = now();
long brSendStartTime = monotonicNow();
try {
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
// Below split threshold, send all reports in a single message.
@ -503,7 +502,7 @@ List<DatanodeCommand> blockReport() throws IOException {
success = true;
} finally {
// Log the block report processing stats from Datanode perspective
long brSendCost = now() - brSendStartTime;
long brSendCost = monotonicNow() - brSendStartTime;
long brCreateCost = brSendStartTime - brCreateStartTime;
dn.getMetrics().addBlockReport(brSendCost);
final int nCmds = cmds.size();
@ -539,7 +538,7 @@ private void scheduleNextBlockReport(long previousReportStartTime) {
* 1) normal like 9:20:18, next report should be at 10:20:14
* 2) unexpected like 11:35:43, next report should be at 12:20:14
*/
lastBlockReport += (now() - lastBlockReport) /
lastBlockReport += (monotonicNow() - lastBlockReport) /
dnConf.blockReportInterval * dnConf.blockReportInterval;
}
}
@ -551,7 +550,7 @@ DatanodeCommand cacheReport() throws IOException {
}
// send cache report if timer has expired.
DatanodeCommand cmd = null;
final long startTime = Time.monotonicNow();
final long startTime = monotonicNow();
if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
if (LOG.isDebugEnabled()) {
LOG.debug("Sending cacheReport from service actor: " + this);
@ -560,10 +559,10 @@ DatanodeCommand cacheReport() throws IOException {
String bpid = bpos.getBlockPoolId();
List<Long> blockIds = dn.getFSDataset().getCacheReport(bpid);
long createTime = Time.monotonicNow();
long createTime = monotonicNow();
cmd = bpNamenode.cacheReport(bpRegistration, bpid, blockIds);
long sendTime = Time.monotonicNow();
long sendTime = monotonicNow();
long createCost = createTime - startTime;
long sendCost = sendTime - createTime;
dn.getMetrics().addCacheReport(sendCost);
@ -670,7 +669,7 @@ private void offerService() throws Exception {
//
while (shouldRun()) {
try {
final long startTime = now();
final long startTime = monotonicNow();
//
// Every so often, send heartbeat or block-report
@ -687,7 +686,7 @@ private void offerService() throws Exception {
if (!dn.areHeartbeatsDisabledForTests()) {
HeartbeatResponse resp = sendHeartBeat();
assert resp != null;
dn.getMetrics().addHeartbeat(now() - startTime);
dn.getMetrics().addHeartbeat(monotonicNow() - startTime);
// If the state of this NN has changed (eg STANDBY->ACTIVE)
// then let the BPOfferService update itself.
@ -703,10 +702,10 @@ private void offerService() throws Exception {
handleRollingUpgradeStatus(resp);
}
long startProcessCommands = now();
long startProcessCommands = monotonicNow();
if (!processCommand(resp.getCommands()))
continue;
long endProcessCommands = now();
long endProcessCommands = monotonicNow();
if (endProcessCommands - startProcessCommands > 2000) {
LOG.info("Took " + (endProcessCommands - startProcessCommands)
+ "ms to process " + resp.getCommands().length
@ -731,7 +730,7 @@ private void offerService() throws Exception {
// or work arrives, and then iterate again.
//
long waitTime = dnConf.heartBeatInterval -
(Time.now() - lastHeartbeat);
(monotonicNow() - lastHeartbeat);
synchronized(pendingIncrementalBRperStorage) {
if (waitTime > 0 && !sendImmediateIBR) {
try {

View File

@ -25,7 +25,7 @@
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION;
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@ -247,7 +247,7 @@ public void run() {
peer.setReadTimeout(dnConf.socketTimeout);
}
opStartTime = now();
opStartTime = monotonicNow();
processOp(op);
++opsProcessed;
} while ((peer != null) &&
@ -1202,7 +1202,7 @@ public void replaceBlock(final ExtendedBlock block,
}
private long elapsed() {
return now() - opStartTime;
return monotonicNow() - opStartTime;
}
/**

View File

@ -19,7 +19,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.File;
import java.io.IOException;
@ -135,11 +135,11 @@ public void run() {
long lastCheckpointTime = 0;
if (!backupNode.shouldCheckpointAtStartup()) {
lastCheckpointTime = now();
lastCheckpointTime = monotonicNow();
}
while(shouldRun) {
try {
long now = now();
long now = monotonicNow();
boolean shouldCheckpoint = false;
if(now >= lastCheckpointTime + periodMSec) {
shouldCheckpoint = true;
@ -182,7 +182,7 @@ void doCheckpoint() throws IOException {
BackupImage bnImage = getFSImage();
NNStorage bnStorage = bnImage.getStorage();
long startTime = now();
long startTime = monotonicNow();
bnImage.freezeNamespaceAtNextRoll();
NamenodeCommand cmd =
@ -276,7 +276,7 @@ void doCheckpoint() throws IOException {
long imageSize = bnImage.getStorage().getFsImageName(txid).length();
LOG.info("Checkpoint completed in "
+ (now() - startTime)/1000 + " seconds."
+ (monotonicNow() - startTime)/1000 + " seconds."
+ " New Image Size: " + imageSize);
}

View File

@ -20,7 +20,7 @@
import java.io.IOException;
import java.io.Closeable;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
@ -109,9 +109,9 @@ public void flush() throws IOException {
public void flush(boolean durable) throws IOException {
numSync++;
long start = now();
long start = monotonicNow();
flushAndSync(durable);
long end = now();
long end = monotonicNow();
totalTimeSync += (end - start);
}

View File

@ -18,7 +18,7 @@
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.ExitUtil.terminate;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.IOException;
import java.lang.reflect.Constructor;
@ -231,7 +231,7 @@ protected synchronized TransactionId initialValue() {
this.conf = conf;
this.storage = storage;
metrics = NameNode.getNameNodeMetrics();
lastPrintTime = now();
lastPrintTime = monotonicNow();
// If this list is empty, an error will be thrown on first use
// of the editlog, as no journals will exist
@ -487,14 +487,14 @@ private long beginTransaction() {
//
TransactionId id = myTransactionId.get();
id.txid = txid;
return now();
return monotonicNow();
}
private void endTransaction(long start) {
assert Thread.holdsLock(this);
// update statistics
long end = now();
long end = monotonicNow();
numTransactions++;
totalTimeTransactions += (end-start);
if (metrics != null) // Metrics is non-null only when used inside name node
@ -641,7 +641,7 @@ public void logSync() {
}
// do the sync
long start = now();
long start = monotonicNow();
try {
if (logStream != null) {
logStream.flush();
@ -658,7 +658,7 @@ public void logSync() {
terminate(1, msg);
}
}
long elapsed = now() - start;
long elapsed = monotonicNow() - start;
if (metrics != null) { // Metrics non-null only when used inside name node
metrics.addSync(elapsed);
@ -680,7 +680,7 @@ public void logSync() {
// print statistics every 1 minute.
//
private void printStatistics(boolean force) {
long now = now();
long now = monotonicNow();
if (lastPrintTime + 60000 > now && !force) {
return;
}

View File

@ -19,7 +19,7 @@
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.FilterInputStream;
import java.io.IOException;
@ -138,13 +138,13 @@ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
prog.beginStep(Phase.LOADING_EDITS, step);
fsNamesys.writeLock();
try {
long startTime = now();
long startTime = monotonicNow();
FSImage.LOG.info("Start loading edits file " + edits.getName());
long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
startOpt, recovery);
FSImage.LOG.info("Edits file " + edits.getName()
+ " of size " + edits.length() + " edits # " + numEdits
+ " loaded in " + (now()-startTime)/1000 + " seconds");
+ " loaded in " + (monotonicNow()-startTime)/1000 + " seconds");
return numEdits;
} finally {
edits.close();
@ -179,7 +179,7 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
Step step = createStartupProgressStep(in);
prog.setTotal(Phase.LOADING_EDITS, step, numTxns);
Counter counter = prog.getCounter(Phase.LOADING_EDITS, step);
long lastLogTime = now();
long lastLogTime = monotonicNow();
long lastInodeId = fsNamesys.dir.getLastInodeId();
try {
@ -259,7 +259,7 @@ long loadEditRecords(EditLogInputStream in, boolean closeOnExit,
}
// log progress
if (op.hasTransactionId()) {
long now = now();
long now = monotonicNow();
if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1;
int percent = Math.round((float) deltaTxId / numTxns * 100);

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.DataInput;
import java.io.DataInputStream;
@ -310,7 +310,7 @@ public void load(File curFile) throws IOException {
StartupProgress prog = NameNode.getStartupProgress();
Step step = new Step(StepType.INODES);
prog.beginStep(Phase.LOADING_FSIMAGE, step);
long startTime = now();
long startTime = monotonicNow();
//
// Load in bits
@ -442,8 +442,9 @@ public void load(File curFile) throws IOException {
imgDigest = new MD5Hash(digester.digest());
loaded = true;
LOG.info("Image file " + curFile + " of size " + curFile.length() +
" bytes loaded in " + (now() - startTime)/1000 + " seconds.");
LOG.info("Image file " + curFile + " of size " + curFile.length()
+ " bytes loaded in " + (monotonicNow() - startTime) / 1000
+ " seconds.");
}
/** Update the root node's attributes */
@ -1245,7 +1246,7 @@ void save(File newFile, FSImageCompression compression) throws IOException {
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
prog.setTotal(Phase.SAVING_CHECKPOINT, step, numINodes);
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
long startTime = now();
long startTime = monotonicNow();
//
// Write out data
//
@ -1313,8 +1314,9 @@ void save(File newFile, FSImageCompression compression) throws IOException {
// set md5 of the saved image
savedDigest = new MD5Hash(digester.digest());
LOG.info("Image file " + newFile + " of size " + newFile.length() +
" bytes saved in " + (now() - startTime)/1000 + " seconds.");
LOG.info("Image file " + newFile + " of size " + newFile.length()
+ " bytes saved in " + (monotonicNow() - startTime) / 1000
+ " seconds.");
}
/**

View File

@ -90,6 +90,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SUPPORT_APPEND_KEY;
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.BufferedWriter;
import java.io.ByteArrayInputStream;
@ -281,7 +282,6 @@
import org.apache.hadoop.util.Daemon;
import org.apache.hadoop.util.DataChecksum;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.VersionInfo;
import org.apache.log4j.Appender;
import org.apache.log4j.AsyncAppender;
@ -672,7 +672,7 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
}
long loadStart = now();
long loadStart = monotonicNow();
try {
namesystem.loadFSImage(startOpt);
} catch (IOException ioe) {
@ -680,7 +680,7 @@ static FSNamesystem loadFromDisk(Configuration conf) throws IOException {
fsImage.close();
throw ioe;
}
long timeTakenToLoadFSImage = now() - loadStart;
long timeTakenToLoadFSImage = monotonicNow() - loadStart;
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
if (nnMetrics != null) {
@ -5071,6 +5071,7 @@ public class SafeModeInfo {
* <br> >0 safe mode is on, but we are in extension period
*/
private long reached = -1;
private long reachedTimestamp = -1;
/** Total number of blocks. */
int blockTotal;
/** Number of safe blocks. */
@ -5171,6 +5172,7 @@ private synchronized boolean isOn() {
*/
private void enter() {
this.reached = 0;
this.reachedTimestamp = 0;
}
/**
@ -5194,6 +5196,7 @@ private synchronized void leave() {
NameNode.stateChangeLog.info("STATE* Safe mode is OFF");
}
reached = -1;
reachedTimestamp = -1;
safeMode = null;
final NetworkTopology nt = blockManager.getDatanodeManager().getNetworkTopology();
NameNode.stateChangeLog.info("STATE* Network topology has "
@ -5232,7 +5235,7 @@ private synchronized boolean canLeave() {
return false;
}
if (now() - reached < extension) {
if (monotonicNow() - reached < extension) {
reportStatus("STATE* Safe mode ON, in safe mode extension.", false);
return false;
}
@ -5288,7 +5291,8 @@ private void checkMode() {
return;
}
// start monitor
reached = now();
reached = monotonicNow();
reachedTimestamp = now();
if (smmthread == null) {
smmthread = new Daemon(new SafeModeMonitor());
smmthread.start();
@ -5435,8 +5439,8 @@ String getTurnOffTip() {
if (!thresholdsMet) {
msg += "once the thresholds have been reached.";
} else if (reached + extension - now() > 0) {
msg += ("in " + (reached + extension - now()) / 1000 + " seconds.");
} else if (reached + extension - monotonicNow() > 0) {
msg += ("in " + (reached + extension - monotonicNow()) / 1000 + " seconds.");
} else {
msg += "soon.";
}
@ -5462,7 +5466,7 @@ public String toString() {
+ ". Target blocks = " + blockThreshold + " for threshold = %" + threshold
+ ". Minimal replication = " + safeReplication + ".";
if (reached > 0)
resText += " Threshold was reached " + new Date(reached) + ".";
resText += " Threshold was reached " + new Date(reachedTimestamp) + ".";
return resText;
}
@ -5941,7 +5945,7 @@ public String getHAState() {
@Metric
public long getMillisSinceLastLoadedEdits() {
if (isInStandbyState() && editLogTailer != null) {
return now() - editLogTailer.getLastLoadTimestamp();
return monotonicNow() - editLogTailer.getLastLoadTimeMs();
} else {
return 0;
}
@ -6983,7 +6987,7 @@ public String getDecomNodes() {
}
private long getLastContact(DatanodeDescriptor alivenode) {
return (Time.now() - alivenode.getLastUpdate())/1000;
return (monotonicNow() - alivenode.getLastUpdateMonotonic())/1000;
}
private long getDfsUsed(DatanodeDescriptor alivenode) {

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import java.io.IOException;
import java.util.ArrayList;
@ -256,17 +256,17 @@ private Lease(String holder) {
}
/** Only LeaseManager object can renew a lease */
private void renew() {
this.lastUpdate = now();
this.lastUpdate = monotonicNow();
}
/** @return true if the Hard Limit Timer has expired */
public boolean expiredHardLimit() {
return now() - lastUpdate > hardLimit;
return monotonicNow() - lastUpdate > hardLimit;
}
/** @return true if the Soft Limit Timer has expired */
public boolean expiredSoftLimit() {
return now() - lastUpdate > softLimit;
return monotonicNow() - lastUpdate > softLimit;
}
/** Does this lease contain any path? */

View File

@ -287,7 +287,7 @@ public void blockIdCK(String blockId) {
* Check files on DFS, starting from the indicated path.
*/
public void fsck() {
final long startTime = Time.now();
final long startTime = Time.monotonicNow();
try {
if(blockIds != null) {
@ -355,7 +355,7 @@ public void fsck() {
}
out.println("FSCK ended at " + new Date() + " in "
+ (Time.now() - startTime + " milliseconds"));
+ (Time.monotonicNow() - startTime + " milliseconds"));
// If there were internal errors during the fsck operation, we want to
// return FAILURE_STATUS, even if those errors were not immediately
@ -381,7 +381,7 @@ public void fsck() {
String errMsg = "Fsck on path '" + path + "' " + FAILURE_STATUS;
LOG.warn(errMsg, e);
out.println("FSCK ended at " + new Date() + " in "
+ (Time.now() - startTime + " milliseconds"));
+ (Time.monotonicNow() - startTime + " milliseconds"));
out.println(e.getMessage());
out.print("\n\n" + errMsg);
} finally {

View File

@ -44,7 +44,7 @@
import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.security.SecurityUtil;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import static org.apache.hadoop.util.ExitUtil.terminate;
import com.google.common.annotations.VisibleForTesting;
@ -84,7 +84,7 @@ public class EditLogTailer {
* The last time we successfully loaded a non-zero number of edits from the
* shared directory.
*/
private long lastLoadTimestamp;
private long lastLoadTimeMs;
/**
* How often the Standby should roll edit logs. Since the Standby only reads
@ -105,7 +105,7 @@ public EditLogTailer(FSNamesystem namesystem, Configuration conf) {
this.namesystem = namesystem;
this.editLog = namesystem.getEditLog();
lastLoadTimestamp = now();
lastLoadTimeMs = monotonicNow();
logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_DEFAULT) * 1000;
@ -241,7 +241,7 @@ void doTailEdits() throws IOException, InterruptedException {
}
if (editsLoaded > 0) {
lastLoadTimestamp = now();
lastLoadTimeMs = monotonicNow();
}
lastLoadedTxnId = image.getLastAppliedTxId();
} finally {
@ -250,10 +250,10 @@ void doTailEdits() throws IOException, InterruptedException {
}
/**
* @return timestamp (in msec) of when we last loaded a non-zero number of edits.
* @return time in msec of when we last loaded a non-zero number of edits.
*/
public long getLastLoadTimestamp() {
return lastLoadTimestamp;
public long getLastLoadTimeMs() {
return lastLoadTimeMs;
}
/**
@ -261,7 +261,7 @@ public long getLastLoadTimestamp() {
*/
private boolean tooLongSinceLastLoad() {
return logRollPeriodMs >= 0 &&
(now() - lastLoadTimestamp) > logRollPeriodMs ;
(monotonicNow() - lastLoadTimeMs) > logRollPeriodMs ;
}
/**

View File

@ -281,6 +281,7 @@ static Map<String, Object> toJsonMap(final DatanodeInfo datanodeinfo) {
m.put("cacheCapacity", datanodeinfo.getCacheCapacity());
m.put("cacheUsed", datanodeinfo.getCacheUsed());
m.put("lastUpdate", datanodeinfo.getLastUpdate());
m.put("lastUpdateMonotonic", datanodeinfo.getLastUpdateMonotonic());
m.put("xceiverCount", datanodeinfo.getXceiverCount());
m.put("networkLocation", datanodeinfo.getNetworkLocation());
m.put("adminState", datanodeinfo.getAdminState().name());
@ -379,6 +380,7 @@ static DatanodeInfo toDatanodeInfo(final Map<?, ?> m)
getLong(m, "cacheCapacity", 0l),
getLong(m, "cacheUsed", 0l),
getLong(m, "lastUpdate", 0l),
getLong(m, "lastUpdateMonotonic", 0l),
getInt(m, "xceiverCount", 0),
getString(m, "networkLocation", ""),
AdminStates.valueOf(getString(m, "adminState", "NORMAL")));

View File

@ -97,6 +97,7 @@ message DatanodeInfoProto {
optional AdminState adminState = 10 [default = NORMAL];
optional uint64 cacheCapacity = 11 [default = 0];
optional uint64 cacheUsed = 12 [default = 0];
optional uint64 lastUpdateMonotonic = 13 [default = 0];
}
/**

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Charsets;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
@ -89,6 +90,7 @@
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.VersionInfo;
import org.apache.log4j.Level;
@ -1031,7 +1033,7 @@ public static DatanodeInfo getLocalDatanodeInfo(String ipAddr,
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT,
1l, 2l, 3l, 4l, 0l, 0l, 5, 6, "local", adminState);
1l, 2l, 3l, 4l, 0l, 0l, 0l, 5, 6, "local", adminState);
}
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
@ -1583,9 +1585,11 @@ public static DatanodeDescriptor getExpectedPrimaryNode(NameNode nn,
// the one to be in charge of the synchronization / recovery protocol.
final DatanodeStorageInfo[] storages = ucBlock.getExpectedStorageLocations();
DatanodeStorageInfo expectedPrimary = storages[0];
long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor().getLastUpdate();
long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor()
.getLastUpdateMonotonic();
for (int i = 1; i < storages.length; i++) {
final long lastUpdate = storages[i].getDatanodeDescriptor().getLastUpdate();
final long lastUpdate = storages[i].getDatanodeDescriptor()
.getLastUpdateMonotonic();
if (lastUpdate > mostRecentLastUpdate) {
expectedPrimary = storages[i];
mostRecentLastUpdate = lastUpdate;
@ -1722,4 +1726,20 @@ public static boolean changeReplicaLength(MiniDFSCluster cluster,
LOG.info("failed to change length of block " + blk);
return false;
}
/**
* Set the datanode dead
*/
public static void setDatanodeDead(DatanodeInfo dn) {
dn.setLastUpdate(0);
dn.setLastUpdateMonotonic(0);
}
/**
* Update lastUpdate and lastUpdateMonotonic with some offset.
*/
public static void resetLastUpdatesWithOffset(DatanodeInfo dn, long offset) {
dn.setLastUpdate(Time.now() + offset);
dn.setLastUpdateMonotonic(Time.monotonicNow() + offset);
}
}

View File

@ -2070,7 +2070,7 @@ public synchronized boolean restartDataNode(
public void setDataNodeDead(DatanodeID dnId) throws IOException {
DatanodeDescriptor dnd =
NameNodeAdapter.getDatanode(getNamesystem(), dnId);
dnd.setLastUpdate(0L);
DFSTestUtil.setDatanodeDead(dnd);
BlockManagerTestUtil.checkHeartbeat(getNamesystem().getBlockManager());
}

View File

@ -134,7 +134,8 @@ public void testReadSelectNonStaleDatanode() throws Exception {
staleNodeInfo = cluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager()
.getDatanode(staleNode.getDatanodeId());
staleNodeInfo.setLastUpdate(Time.now() - staleInterval - 1);
DFSTestUtil.resetLastUpdatesWithOffset(staleNodeInfo,
-(staleInterval + 1));
LocatedBlocks blocksAfterStale = client.getNamenode().getBlockLocations(
fileName.toString(), 0, blockSize);
@ -145,8 +146,7 @@ public void testReadSelectNonStaleDatanode() throws Exception {
// restart the staleNode's heartbeat
DataNodeTestUtils.setHeartbeatsDisabledForTests(staleNode, false);
// reset the first node as non-stale, so as to avoid two stale nodes
staleNodeInfo.setLastUpdate(Time.now());
DFSTestUtil.resetLastUpdatesWithOffset(staleNodeInfo, 0);
LocatedBlock lastBlock = client.getLocatedBlocks(fileName.toString(), 0,
Long.MAX_VALUE).getLastLocatedBlock();
nodes = lastBlock.getLocations();
@ -155,10 +155,10 @@ public void testReadSelectNonStaleDatanode() throws Exception {
staleNode = this.stopDataNodeHeartbeat(cluster, nodes[0].getHostName());
assertNotNull(staleNode);
// set the node as stale
cluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager()
.getDatanode(staleNode.getDatanodeId())
.setLastUpdate(Time.now() - staleInterval - 1);
DatanodeDescriptor dnDesc = cluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager()
.getDatanode(staleNode.getDatanodeId());
DFSTestUtil.resetLastUpdatesWithOffset(dnDesc, -(staleInterval + 1));
LocatedBlock lastBlockAfterStale = client.getLocatedBlocks(
fileName.toString(), 0, Long.MAX_VALUE).getLastLocatedBlock();

View File

@ -78,7 +78,7 @@ private void waitForBlockReplication(String filename,
ClientProtocol namenode,
int expected, long maxWaitSec)
throws IOException {
long start = Time.now();
long start = Time.monotonicNow();
//wait for all the blocks to be replicated;
LOG.info("Checking for block replication for " + filename);
@ -103,7 +103,7 @@ private void waitForBlockReplication(String filename,
actual + ".");
if (maxWaitSec > 0 &&
(Time.now() - start) > (maxWaitSec * 1000)) {
(Time.monotonicNow() - start) > (maxWaitSec * 1000)) {
throw new IOException("Timedout while waiting for all blocks to " +
" be replicated for " + filename);
}

View File

@ -101,7 +101,7 @@ public void testLeaseAbort() throws Exception {
// call renewLease() manually.
// make it look like the soft limit has been exceeded.
LeaseRenewer originalRenewer = dfs.getLeaseRenewer();
dfs.lastLeaseRenewal = Time.now()
dfs.lastLeaseRenewal = Time.monotonicNow()
- HdfsConstants.LEASE_SOFTLIMIT_PERIOD - 1000;
try {
dfs.renewLease();
@ -117,7 +117,7 @@ public void testLeaseAbort() throws Exception {
}
// make it look like the hard limit has been exceeded.
dfs.lastLeaseRenewal = Time.now()
dfs.lastLeaseRenewal = Time.monotonicNow()
- HdfsConstants.LEASE_HARDLIMIT_PERIOD - 1000;
dfs.renewLease();

View File

@ -111,8 +111,8 @@ public Boolean answer(InvocationOnMock invocation) throws Throwable {
renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
// Wait for lease to get renewed
long failTime = Time.now() + 5000;
while (Time.now() < failTime &&
long failTime = Time.monotonicNow() + 5000;
while (Time.monotonicNow() < failTime &&
leaseRenewalCount.get() == 0) {
Thread.sleep(50);
}
@ -193,11 +193,11 @@ public void testThreadName() throws Exception {
// Pretend to close the file
renewer.closeFile(fileId, MOCK_DFSCLIENT);
renewer.setEmptyTime(Time.now());
renewer.setEmptyTime(Time.monotonicNow());
// Should stop the renewer running within a few seconds
long failTime = Time.now() + 5000;
while (renewer.isRunning() && Time.now() < failTime) {
long failTime = Time.monotonicNow() + 5000;
while (renewer.isRunning() && Time.monotonicNow() < failTime) {
Thread.sleep(50);
}
Assert.assertFalse(renewer.isRunning());

View File

@ -333,7 +333,7 @@ boolean runParallelRead(int nFiles, int nWorkerEach, ReadWorkerHelper helper) th
}
// Start the workers and wait
long starttime = Time.now();
long starttime = Time.monotonicNow();
for (ReadWorker worker : workers) {
worker.start();
}
@ -343,7 +343,7 @@ boolean runParallelRead(int nFiles, int nWorkerEach, ReadWorkerHelper helper) th
worker.join();
} catch (InterruptedException ignored) { }
}
long endtime = Time.now();
long endtime = Time.monotonicNow();
// Cleanup
for (TestFileInfo testInfo : testInfoArr) {

View File

@ -281,7 +281,7 @@ private void waitForBlockReplication(String filename,
ClientProtocol namenode,
int expected, long maxWaitSec)
throws IOException {
long start = Time.now();
long start = Time.monotonicNow();
//wait for all the blocks to be replicated;
LOG.info("Checking for block replication for " + filename);
@ -307,7 +307,7 @@ private void waitForBlockReplication(String filename,
}
if (maxWaitSec > 0 &&
(Time.now() - start) > (maxWaitSec * 1000)) {
(Time.monotonicNow() - start) > (maxWaitSec * 1000)) {
throw new IOException("Timedout while waiting for all blocks to " +
" be replicated for " + filename);
}

View File

@ -262,7 +262,7 @@ static void waitForHeartBeat(long expectedUsedSpace,
throws IOException, TimeoutException {
long timeout = TIMEOUT;
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
: Time.now() + timeout;
: Time.monotonicNow() + timeout;
while (true) {
long[] status = client.getStats();
@ -274,7 +274,7 @@ static void waitForHeartBeat(long expectedUsedSpace,
&& usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE)
break; //done
if (Time.now() > failtime) {
if (Time.monotonicNow() > failtime) {
throw new TimeoutException("Cluster failed to reached expected values of "
+ "totalSpace (current: " + status[0]
+ ", expected: " + expectedTotalSpace
@ -369,7 +369,7 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
int expectedExcludedNodes) throws IOException, TimeoutException {
long timeout = TIMEOUT;
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
: Time.now() + timeout;
: Time.monotonicNow() + timeout;
if (!p.nodesToBeIncluded.isEmpty()) {
totalCapacity = p.nodesToBeIncluded.size() * CAPACITY;
}
@ -399,7 +399,7 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
}
if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) {
balanced = false;
if (Time.now() > failtime) {
if (Time.monotonicNow() > failtime) {
throw new TimeoutException(
"Rebalancing expected avg utilization to become "
+ avgUtilization + ", but on datanode " + datanode

View File

@ -186,7 +186,7 @@ public static void noticeDeadDatanode(NameNode nn, String dnName) {
Assert.assertNotNull("Could not find DN with name: " + dnName, theDND);
synchronized (hbm) {
theDND.setLastUpdate(0);
DFSTestUtil.setDatanodeDead(theDND);
hbm.heartbeatCheck();
}
} finally {

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
import org.apache.hadoop.util.Time;
import org.junit.Test;
/**
@ -46,40 +47,34 @@ public void testInitializeBlockRecovery() throws Exception {
new DatanodeStorageInfo[] {s1, s2, s3});
// Recovery attempt #1.
long currentTime = System.currentTimeMillis();
dd1.setLastUpdate(currentTime - 3 * 1000);
dd2.setLastUpdate(currentTime - 1 * 1000);
dd3.setLastUpdate(currentTime - 2 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -3 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -2 * 1000);
blockInfo.initializeBlockRecovery(1);
BlockInfoContiguousUnderConstruction[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1);
assertEquals(blockInfoRecovery[0], blockInfo);
// Recovery attempt #2.
currentTime = System.currentTimeMillis();
dd1.setLastUpdate(currentTime - 2 * 1000);
dd2.setLastUpdate(currentTime - 1 * 1000);
dd3.setLastUpdate(currentTime - 3 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -3 * 1000);
blockInfo.initializeBlockRecovery(2);
blockInfoRecovery = dd1.getLeaseRecoveryCommand(1);
assertEquals(blockInfoRecovery[0], blockInfo);
// Recovery attempt #3.
currentTime = System.currentTimeMillis();
dd1.setLastUpdate(currentTime - 2 * 1000);
dd2.setLastUpdate(currentTime - 1 * 1000);
dd3.setLastUpdate(currentTime - 3 * 1000);
currentTime = System.currentTimeMillis();
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -3 * 1000);
blockInfo.initializeBlockRecovery(3);
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
assertEquals(blockInfoRecovery[0], blockInfo);
// Recovery attempt #4.
// Reset everything. And again pick DN with most recent heart beat.
currentTime = System.currentTimeMillis();
dd1.setLastUpdate(currentTime - 2 * 1000);
dd2.setLastUpdate(currentTime - 1 * 1000);
dd3.setLastUpdate(currentTime);
currentTime = System.currentTimeMillis();
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
blockInfo.initializeBlockRecovery(3);
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
assertEquals(blockInfoRecovery[0], blockInfo);

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.Block;
@ -38,6 +39,7 @@
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.util.Time;
import org.junit.Test;
/**
@ -164,9 +166,9 @@ public void testHeartbeatBlockRecovery() throws Exception {
NameNodeAdapter.sendHeartBeat(nodeReg3, dd3, namesystem);
// Test with all alive nodes.
dd1.setLastUpdate(System.currentTimeMillis());
dd2.setLastUpdate(System.currentTimeMillis());
dd3.setLastUpdate(System.currentTimeMillis());
DFSTestUtil.resetLastUpdatesWithOffset(dd1, 0);
DFSTestUtil.resetLastUpdatesWithOffset(dd2, 0);
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
final DatanodeStorageInfo[] storages = {
dd1.getStorageInfos()[0],
dd2.getStorageInfos()[0],
@ -189,10 +191,10 @@ public void testHeartbeatBlockRecovery() throws Exception {
assertEquals(recoveringNodes[2], dd3);
// Test with one stale node.
dd1.setLastUpdate(System.currentTimeMillis());
DFSTestUtil.resetLastUpdatesWithOffset(dd1, 0);
// More than the default stale interval of 30 seconds.
dd2.setLastUpdate(System.currentTimeMillis() - 40 * 1000);
dd3.setLastUpdate(System.currentTimeMillis());
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -40 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
blockInfo = new BlockInfoContiguousUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3,
BlockUCState.UNDER_RECOVERY, storages);
@ -210,10 +212,10 @@ public void testHeartbeatBlockRecovery() throws Exception {
assertEquals(recoveringNodes[1], dd3);
// Test with all stale node.
dd1.setLastUpdate(System.currentTimeMillis() - 60 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd1, - 60 * 1000);
// More than the default stale interval of 30 seconds.
dd2.setLastUpdate(System.currentTimeMillis() - 40 * 1000);
dd3.setLastUpdate(System.currentTimeMillis() - 80 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd2, - 40 * 1000);
DFSTestUtil.resetLastUpdatesWithOffset(dd3, - 80 * 1000);
blockInfo = new BlockInfoContiguousUnderConstruction(
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3,
BlockUCState.UNDER_RECOVERY, storages);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.blockmanagement;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -142,7 +143,7 @@ public void testIncludeExcludeLists() throws IOException {
.DatanodeReportType.DEAD).size());
DatanodeDescriptor spam = new DatanodeDescriptor(new DatanodeID("127.0.0" +
".3", "127.0.0.3", "uuid-spam", 12345, 1020, 1021, 1022));
spam.setLastUpdate(0);
DFSTestUtil.setDatanodeDead(spam);
includedNodes.add(entry("127.0.0.3:12345"));
dnMap.put("uuid-spam", spam);
Assert.assertEquals(1, dm.getDatanodeListForReport(HdfsConstants

View File

@ -137,7 +137,7 @@ public void testNodeCount() throws Exception {
void initializeTimeout(long timeout) {
this.timeout = timeout;
this.failtime = Time.now()
this.failtime = Time.monotonicNow()
+ ((timeout <= 0) ? Long.MAX_VALUE : timeout);
}
@ -148,7 +148,7 @@ void checkTimeout(String testLabel) throws TimeoutException {
/* check for timeout, then wait for cycleTime msec */
void checkTimeout(String testLabel, long cycleTime) throws TimeoutException {
if (Time.now() > failtime) {
if (Time.monotonicNow() > failtime) {
throw new TimeoutException("Timeout: "
+ testLabel + " for block " + lastBlock + " after " + timeout
+ " msec. Last counts: live = " + lastNum.liveReplicas()

View File

@ -17,7 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -42,6 +42,7 @@
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.util.Time;
import org.junit.Test;
public class TestOverReplicatedBlocks {
@ -171,10 +172,10 @@ public void testChooseReplicaToDelete() throws Exception {
long waitTime = DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT * 1000 *
(DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT + 1);
do {
nodeInfo =
namesystem.getBlockManager().getDatanodeManager().getDatanode(dnReg);
lastHeartbeat = nodeInfo.getLastUpdate();
} while(now() - lastHeartbeat < waitTime);
nodeInfo = namesystem.getBlockManager().getDatanodeManager()
.getDatanode(dnReg);
lastHeartbeat = nodeInfo.getLastUpdateMonotonic();
} while (monotonicNow() - lastHeartbeat < waitTime);
fs.setReplication(fileName, (short)3);
BlockLocation locs[] = fs.getFileBlockLocations(

View File

@ -48,6 +48,7 @@
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.Block;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
@ -562,7 +563,7 @@ private boolean containsWithinRange(DatanodeDescriptor target,
@Test
public void testChooseTargetWithStaleNodes() throws Exception {
// Set dataNodes[0] as stale
dataNodes[0].setLastUpdate(Time.now() - staleInterval - 1);
DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[0], -(staleInterval + 1));
namenode.getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
assertTrue(namenode.getNamesystem().getBlockManager()
@ -582,7 +583,7 @@ public void testChooseTargetWithStaleNodes() throws Exception {
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
// reset
dataNodes[0].setLastUpdate(Time.now());
DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[0], 0);
namenode.getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
}
@ -599,7 +600,8 @@ public void testChooseTargetWithStaleNodes() throws Exception {
public void testChooseTargetWithHalfStaleNodes() throws Exception {
// Set dataNodes[0], dataNodes[1], and dataNodes[2] as stale
for (int i = 0; i < 3; i++) {
dataNodes[i].setLastUpdate(Time.now() - staleInterval - 1);
DFSTestUtil
.resetLastUpdatesWithOffset(dataNodes[i], -(staleInterval + 1));
}
namenode.getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
@ -631,7 +633,7 @@ public void testChooseTargetWithHalfStaleNodes() throws Exception {
assertTrue(containsWithinRange(dataNodes[5], targets, 0, 3));
for (int i = 0; i < dataNodes.length; i++) {
dataNodes[i].setLastUpdate(Time.now());
DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[i], 0);
}
namenode.getNamesystem().getBlockManager()
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
@ -657,9 +659,10 @@ public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
for (int i = 0; i < 2; i++) {
DataNode dn = miniCluster.getDataNodes().get(i);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanode(dn.getDatanodeId())
.setLastUpdate(Time.now() - staleInterval - 1);
DatanodeDescriptor dnDes = miniCluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager()
.getDatanode(dn.getDatanodeId());
DFSTestUtil.resetLastUpdatesWithOffset(dnDes, -(staleInterval + 1));
}
// Instead of waiting, explicitly call heartbeatCheck to
// let heartbeat manager to detect stale nodes
@ -687,9 +690,9 @@ public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
for (int i = 0; i < 4; i++) {
DataNode dn = miniCluster.getDataNodes().get(i);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanode(dn.getDatanodeId())
.setLastUpdate(Time.now() - staleInterval - 1);
DatanodeDescriptor dnDesc = miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanode(dn.getDatanodeId());
DFSTestUtil.resetLastUpdatesWithOffset(dnDesc, -(staleInterval + 1));
}
// Explicitly call heartbeatCheck
miniCluster.getNameNode().getNamesystem().getBlockManager()
@ -708,14 +711,15 @@ public void testChooseTargetWithMoreThanHalfStaleNodes() throws Exception {
assertEquals(targets.length, 3);
assertTrue(isOnSameRack(targets[0], staleNodeInfo));
// Step 3. Set 2 stale datanodes back to healthy nodes,
// Step 3. Set 2 stale datanodes back to healthy nodes,
// still have 2 stale nodes
for (int i = 2; i < 4; i++) {
DataNode dn = miniCluster.getDataNodes().get(i);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
miniCluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanode(dn.getDatanodeId())
.setLastUpdate(Time.now());
DatanodeDescriptor dnDesc = miniCluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager()
.getDatanode(dn.getDatanodeId());
DFSTestUtil.resetLastUpdatesWithOffset(dnDesc, 0);
}
// Explicitly call heartbeatCheck
miniCluster.getNameNode().getNamesystem().getBlockManager()
@ -971,7 +975,7 @@ public void testChooseReplicaToDelete() throws Exception {
// Refresh the last update time for all the datanodes
for (int i = 0; i < dataNodes.length; i++) {
dataNodes[i].setLastUpdate(Time.now());
DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[i], 0);
}
List<DatanodeStorageInfo> first = new ArrayList<DatanodeStorageInfo>();

View File

@ -660,12 +660,12 @@ private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
final DataNode dn1 = cluster.getDataNodes().get(DN_N1);
String bpid = cluster.getNamesystem().getBlockPoolId();
Replica r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId());
long start = Time.now();
long start = Time.monotonicNow();
int count = 0;
while (r == null) {
waitTil(5);
r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId());
long waiting_period = Time.now() - start;
long waiting_period = Time.monotonicNow() - start;
if (count++ % 100 == 0)
if(LOG.isDebugEnabled()) {
LOG.debug("Has been waiting for " + waiting_period + " ms.");
@ -679,7 +679,7 @@ private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
if(LOG.isDebugEnabled()) {
LOG.debug("Replica state before the loop " + state.getValue());
}
start = Time.now();
start = Time.monotonicNow();
while (state != HdfsServerConstants.ReplicaState.TEMPORARY) {
waitTil(5);
state = r.getState();
@ -687,7 +687,7 @@ private void waitForTempReplica(Block bl, int DN_N1) throws IOException {
LOG.debug("Keep waiting for " + bl.getBlockName() +
" is in state " + state.getValue());
}
if (Time.now() - start > TIMEOUT)
if (Time.monotonicNow() - start > TIMEOUT)
assertTrue("Was waiting too long for a replica to become TEMPORARY",
tooLongWait);
}

View File

@ -73,7 +73,7 @@ public void testThrottler() throws IOException {
long bandwidthPerSec = 1024*1024L;
final long TOTAL_BYTES =6*bandwidthPerSec;
long bytesToSend = TOTAL_BYTES;
long start = Time.now();
long start = Time.monotonicNow();
DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
long totalBytes = 0L;
long bytesSent = 1024*512L; // 0.5MB
@ -86,7 +86,7 @@ public void testThrottler() throws IOException {
Thread.sleep(1000);
} catch (InterruptedException ignored) {}
throttler.throttle(bytesToSend);
long end = Time.now();
long end = Time.monotonicNow();
assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
}
@ -254,7 +254,7 @@ private void checkBlocks(DatanodeInfo[] includeNodes, String fileName,
throws IOException, TimeoutException {
boolean notDone;
final long TIMEOUT = 20000L;
long starttime = Time.now();
long starttime = Time.monotonicNow();
long failtime = starttime + TIMEOUT;
do {
try {
@ -279,7 +279,7 @@ private void checkBlocks(DatanodeInfo[] includeNodes, String fileName,
}
}
}
if (Time.now() > failtime) {
if (Time.monotonicNow() > failtime) {
String expectedNodesList = "";
String currentNodesList = "";
for (DatanodeInfo dn : includeNodes)

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DFSOutputStream;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -199,7 +200,7 @@ public void testXceiverCount() throws Exception {
DataNode dn = datanodes.get(i);
DatanodeDescriptor dnd = dnm.getDatanode(dn.getDatanodeId());
dn.shutdown();
dnd.setLastUpdate(0L);
DFSTestUtil.setDatanodeDead(dnd);
BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
expectedInServiceNodes--;
assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes());
@ -278,7 +279,7 @@ public void testXceiverCount() throws Exception {
dn.shutdown();
// force it to appear dead so live count decreases
DatanodeDescriptor dnDesc = dnm.getDatanode(dn.getDatanodeId());
dnDesc.setLastUpdate(0L);
DFSTestUtil.setDatanodeDead(dnDesc);
BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
assertEquals(nodes-1-i, namesystem.getNumLiveDataNodes());
// first few nodes are already out of service

View File

@ -43,6 +43,7 @@
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
@ -168,9 +169,10 @@ public void testStaleNodes() throws Exception {
long staleInterval = CONF.getLong(
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
cluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanode(dn.getDatanodeId())
.setLastUpdate(Time.now() - staleInterval - 1);
DatanodeDescriptor dnDes = cluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager()
.getDatanode(dn.getDatanodeId());
DFSTestUtil.resetLastUpdatesWithOffset(dnDes, -(staleInterval + 1));
}
// Let HeartbeatManager to check heartbeat
BlockManagerTestUtil.checkHeartbeat(cluster.getNameNode().getNamesystem()
@ -181,9 +183,10 @@ public void testStaleNodes() throws Exception {
for (int i = 0; i < 2; i++) {
DataNode dn = cluster.getDataNodes().get(i);
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
cluster.getNameNode().getNamesystem().getBlockManager()
.getDatanodeManager().getDatanode(dn.getDatanodeId())
.setLastUpdate(Time.now());
DatanodeDescriptor dnDes = cluster.getNameNode().getNamesystem()
.getBlockManager().getDatanodeManager()
.getDatanode(dn.getDatanodeId());
DFSTestUtil.resetLastUpdatesWithOffset(dnDes, 0);
}
// Let HeartbeatManager to refresh