HDFS-6841. Use Time.monotonicNow() wherever applicable instead of Time.now(). Contributed by Vinayakumar B
This commit is contained in:
parent
d368d3647a
commit
75ead273be
|
@ -1229,6 +1229,9 @@ Release 2.7.0 - UNRELEASED
|
|||
|
||||
HDFS-7957. Truncate should verify quota before making changes. (jing9)
|
||||
|
||||
HDFS-6841. Use Time.monotonicNow() wherever applicable instead of Time.now()
|
||||
(Vinayakumar B via kihwal)
|
||||
|
||||
BREAKDOWN OF HDFS-7584 SUBTASKS AND RELATED JIRAS
|
||||
|
||||
HDFS-7720. Quota by Storage Type API, tools and ClientNameNode
|
||||
|
|
|
@ -885,7 +885,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
if (filesBeingWritten.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
lastLeaseRenewal = Time.now();
|
||||
lastLeaseRenewal = Time.monotonicNow();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -902,7 +902,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
return true;
|
||||
} catch (IOException e) {
|
||||
// Abort if the lease has already expired.
|
||||
final long elapsed = Time.now() - getLastLeaseRenewal();
|
||||
final long elapsed = Time.monotonicNow() - getLastLeaseRenewal();
|
||||
if (elapsed > HdfsConstants.LEASE_HARDLIMIT_PERIOD) {
|
||||
LOG.warn("Failed to renew lease for " + clientName + " for "
|
||||
+ (elapsed/1000) + " seconds (>= hard-limit ="
|
||||
|
@ -1020,7 +1020,7 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
|
|||
* @see ClientProtocol#getServerDefaults()
|
||||
*/
|
||||
public FsServerDefaults getServerDefaults() throws IOException {
|
||||
long now = Time.now();
|
||||
long now = Time.monotonicNow();
|
||||
if (now - serverDefaultsLastUpdate > SERVER_DEFAULTS_VALIDITY_PERIOD) {
|
||||
serverDefaults = namenode.getServerDefaults();
|
||||
serverDefaultsLastUpdate = now;
|
||||
|
|
|
@ -382,7 +382,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
*/
|
||||
@Override
|
||||
public void run() {
|
||||
long lastPacket = Time.now();
|
||||
long lastPacket = Time.monotonicNow();
|
||||
TraceScope scope = NullScope.INSTANCE;
|
||||
while (!streamerClosed && dfsClient.clientRunning) {
|
||||
// if the Responder encountered an error, shutdown Responder
|
||||
|
@ -406,7 +406,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
|
||||
synchronized (dataQueue) {
|
||||
// wait for a packet to be sent.
|
||||
long now = Time.now();
|
||||
long now = Time.monotonicNow();
|
||||
while ((!streamerClosed && !hasError && dfsClient.clientRunning
|
||||
&& dataQueue.size() == 0 &&
|
||||
(stage != BlockConstructionStage.DATA_STREAMING ||
|
||||
|
@ -422,7 +422,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
DFSClient.LOG.warn("Caught exception ", e);
|
||||
}
|
||||
doSleep = false;
|
||||
now = Time.now();
|
||||
now = Time.monotonicNow();
|
||||
}
|
||||
if (streamerClosed || hasError || !dfsClient.clientRunning) {
|
||||
continue;
|
||||
|
@ -521,7 +521,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
} finally {
|
||||
writeScope.close();
|
||||
}
|
||||
lastPacket = Time.now();
|
||||
lastPacket = Time.monotonicNow();
|
||||
|
||||
// update bytesSent
|
||||
long tmpBytesSent = one.getLastByteOffsetBlock();
|
||||
|
@ -760,8 +760,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// the local node or the only one in the pipeline.
|
||||
if (PipelineAck.isRestartOOBStatus(reply) &&
|
||||
shouldWaitForRestart(i)) {
|
||||
restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
|
||||
Time.now();
|
||||
restartDeadline = dfsClient.getConf().datanodeRestartTimeout
|
||||
+ Time.monotonicNow();
|
||||
setRestartingNodeIndex(i);
|
||||
String message = "A datanode is restarting: " + targets[i];
|
||||
DFSClient.LOG.info(message);
|
||||
|
@ -1175,7 +1175,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
errorIndex = -1;
|
||||
}
|
||||
// still within the deadline
|
||||
if (Time.now() < restartDeadline) {
|
||||
if (Time.monotonicNow() < restartDeadline) {
|
||||
continue; // with in the deadline
|
||||
}
|
||||
// expired. declare the restarting node dead
|
||||
|
@ -1226,14 +1226,12 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
errorIndex = -1;
|
||||
success = false;
|
||||
|
||||
long startTime = Time.now();
|
||||
DatanodeInfo[] excluded =
|
||||
excludedNodes.getAllPresent(excludedNodes.asMap().keySet())
|
||||
.keySet()
|
||||
.toArray(new DatanodeInfo[0]);
|
||||
block = oldBlock;
|
||||
lb = locateFollowingBlock(startTime,
|
||||
excluded.length > 0 ? excluded : null);
|
||||
lb = locateFollowingBlock(excluded.length > 0 ? excluded : null);
|
||||
block = lb.getBlock();
|
||||
block.setNumBytes(0);
|
||||
bytesSent = 0;
|
||||
|
@ -1380,7 +1378,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// Check whether there is a restart worth waiting for.
|
||||
if (checkRestart && shouldWaitForRestart(errorIndex)) {
|
||||
restartDeadline = dfsClient.getConf().datanodeRestartTimeout +
|
||||
Time.now();
|
||||
Time.monotonicNow();
|
||||
restartingNodeIndex.set(errorIndex);
|
||||
errorIndex = -1;
|
||||
DFSClient.LOG.info("Waiting for the datanode to be restarted: " +
|
||||
|
@ -1430,13 +1428,12 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
}
|
||||
|
||||
private LocatedBlock locateFollowingBlock(long start,
|
||||
DatanodeInfo[] excludedNodes) throws IOException {
|
||||
private LocatedBlock locateFollowingBlock(DatanodeInfo[] excludedNodes) throws IOException {
|
||||
int retries = dfsClient.getConf().nBlockWriteLocateFollowingRetry;
|
||||
long sleeptime = dfsClient.getConf().
|
||||
blockWriteLocateFollowingInitialDelayMs;
|
||||
while (true) {
|
||||
long localstart = Time.now();
|
||||
long localstart = Time.monotonicNow();
|
||||
while (true) {
|
||||
try {
|
||||
return dfsClient.namenode.addBlock(src, dfsClient.clientName,
|
||||
|
@ -1460,10 +1457,10 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
} else {
|
||||
--retries;
|
||||
DFSClient.LOG.info("Exception while adding a block", e);
|
||||
if (Time.now() - localstart > 5000) {
|
||||
long elapsed = Time.monotonicNow() - localstart;
|
||||
if (elapsed > 5000) {
|
||||
DFSClient.LOG.info("Waiting for replication for "
|
||||
+ (Time.now() - localstart) / 1000
|
||||
+ " seconds");
|
||||
+ (elapsed / 1000) + " seconds");
|
||||
}
|
||||
try {
|
||||
DFSClient.LOG.warn("NotReplicatedYetException sleeping " + src
|
||||
|
@ -2253,7 +2250,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
// should be called holding (this) lock since setTestFilename() may
|
||||
// be called during unit tests
|
||||
private void completeFile(ExtendedBlock last) throws IOException {
|
||||
long localstart = Time.now();
|
||||
long localstart = Time.monotonicNow();
|
||||
long sleeptime = dfsClient.getConf().
|
||||
blockWriteLocateFollowingInitialDelayMs;
|
||||
boolean fileComplete = false;
|
||||
|
@ -2263,8 +2260,9 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
dfsClient.namenode.complete(src, dfsClient.clientName, last, fileId);
|
||||
if (!fileComplete) {
|
||||
final int hdfsTimeout = dfsClient.getHdfsTimeout();
|
||||
if (!dfsClient.clientRunning ||
|
||||
(hdfsTimeout > 0 && localstart + hdfsTimeout < Time.now())) {
|
||||
if (!dfsClient.clientRunning
|
||||
|| (hdfsTimeout > 0
|
||||
&& localstart + hdfsTimeout < Time.monotonicNow())) {
|
||||
String msg = "Unable to close file because dfsclient " +
|
||||
" was unable to contact the HDFS servers." +
|
||||
" clientRunning " + dfsClient.clientRunning +
|
||||
|
@ -2280,7 +2278,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
retries--;
|
||||
Thread.sleep(sleeptime);
|
||||
sleeptime *= 2;
|
||||
if (Time.now() - localstart > 5000) {
|
||||
if (Time.monotonicNow() - localstart > 5000) {
|
||||
DFSClient.LOG.info("Could not complete " + src + " retrying...");
|
||||
}
|
||||
} catch (InterruptedException ie) {
|
||||
|
|
|
@ -278,7 +278,7 @@ class LeaseRenewer {
|
|||
/** Is the empty period longer than the grace period? */
|
||||
private synchronized boolean isRenewerExpired() {
|
||||
return emptyTime != Long.MAX_VALUE
|
||||
&& Time.now() - emptyTime > gracePeriod;
|
||||
&& Time.monotonicNow() - emptyTime > gracePeriod;
|
||||
}
|
||||
|
||||
synchronized void put(final long inodeId, final DFSOutputStream out,
|
||||
|
@ -346,7 +346,7 @@ class LeaseRenewer {
|
|||
}
|
||||
}
|
||||
//discover the first time that all file-being-written maps are empty.
|
||||
emptyTime = Time.now();
|
||||
emptyTime = Time.monotonicNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -361,7 +361,7 @@ class LeaseRenewer {
|
|||
}
|
||||
if (emptyTime == Long.MAX_VALUE) {
|
||||
//discover the first time that the client list is empty.
|
||||
emptyTime = Time.now();
|
||||
emptyTime = Time.monotonicNow();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -434,9 +434,9 @@ class LeaseRenewer {
|
|||
* when the lease period is half over.
|
||||
*/
|
||||
private void run(final int id) throws InterruptedException {
|
||||
for(long lastRenewed = Time.now(); !Thread.interrupted();
|
||||
for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted();
|
||||
Thread.sleep(getSleepPeriod())) {
|
||||
final long elapsed = Time.now() - lastRenewed;
|
||||
final long elapsed = Time.monotonicNow() - lastRenewed;
|
||||
if (elapsed >= getRenewalTime()) {
|
||||
try {
|
||||
renew();
|
||||
|
@ -444,7 +444,7 @@ class LeaseRenewer {
|
|||
LOG.debug("Lease renewer daemon for " + clientsString()
|
||||
+ " with renew id " + id + " executed");
|
||||
}
|
||||
lastRenewed = Time.now();
|
||||
lastRenewed = Time.monotonicNow();
|
||||
} catch (SocketTimeoutException ie) {
|
||||
LOG.warn("Failed to renew lease for " + clientsString() + " for "
|
||||
+ (elapsed/1000) + " seconds. Aborting ...", ie);
|
||||
|
@ -479,7 +479,7 @@ class LeaseRenewer {
|
|||
// registered with this renewer, stop the daemon after the grace
|
||||
// period.
|
||||
if (!clientsRunning() && emptyTime == Long.MAX_VALUE) {
|
||||
emptyTime = Time.now();
|
||||
emptyTime = Time.monotonicNow();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -49,6 +49,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
|||
private long cacheCapacity;
|
||||
private long cacheUsed;
|
||||
private long lastUpdate;
|
||||
private long lastUpdateMonotonic;
|
||||
private int xceiverCount;
|
||||
private String location = NetworkTopology.DEFAULT_RACK;
|
||||
private String softwareVersion;
|
||||
|
@ -91,6 +92,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
|||
this.cacheCapacity = from.getCacheCapacity();
|
||||
this.cacheUsed = from.getCacheUsed();
|
||||
this.lastUpdate = from.getLastUpdate();
|
||||
this.lastUpdateMonotonic = from.getLastUpdateMonotonic();
|
||||
this.xceiverCount = from.getXceiverCount();
|
||||
this.location = from.getNetworkLocation();
|
||||
this.adminState = from.getAdminState();
|
||||
|
@ -105,6 +107,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
|||
this.cacheCapacity = 0L;
|
||||
this.cacheUsed = 0L;
|
||||
this.lastUpdate = 0L;
|
||||
this.lastUpdateMonotonic = 0L;
|
||||
this.xceiverCount = 0;
|
||||
this.adminState = null;
|
||||
}
|
||||
|
@ -117,13 +120,13 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
|||
public DatanodeInfo(DatanodeID nodeID, String location,
|
||||
final long capacity, final long dfsUsed, final long remaining,
|
||||
final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
|
||||
final long lastUpdate, final int xceiverCount,
|
||||
final AdminStates adminState) {
|
||||
final long lastUpdate, final long lastUpdateMonotonic,
|
||||
final int xceiverCount, final AdminStates adminState) {
|
||||
this(nodeID.getIpAddr(), nodeID.getHostName(), nodeID.getDatanodeUuid(),
|
||||
nodeID.getXferPort(), nodeID.getInfoPort(), nodeID.getInfoSecurePort(),
|
||||
nodeID.getIpcPort(), capacity, dfsUsed, remaining, blockPoolUsed,
|
||||
cacheCapacity, cacheUsed, lastUpdate, xceiverCount, location,
|
||||
adminState);
|
||||
cacheCapacity, cacheUsed, lastUpdate, lastUpdateMonotonic,
|
||||
xceiverCount, location, adminState);
|
||||
}
|
||||
|
||||
/** Constructor */
|
||||
|
@ -132,8 +135,9 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
|||
final int infoSecurePort, final int ipcPort,
|
||||
final long capacity, final long dfsUsed, final long remaining,
|
||||
final long blockPoolUsed, final long cacheCapacity, final long cacheUsed,
|
||||
final long lastUpdate, final int xceiverCount,
|
||||
final String networkLocation, final AdminStates adminState) {
|
||||
final long lastUpdate, final long lastUpdateMonotonic,
|
||||
final int xceiverCount, final String networkLocation,
|
||||
final AdminStates adminState) {
|
||||
super(ipAddr, hostName, datanodeUuid, xferPort, infoPort,
|
||||
infoSecurePort, ipcPort);
|
||||
this.capacity = capacity;
|
||||
|
@ -143,6 +147,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
|||
this.cacheCapacity = cacheCapacity;
|
||||
this.cacheUsed = cacheUsed;
|
||||
this.lastUpdate = lastUpdate;
|
||||
this.lastUpdateMonotonic = lastUpdateMonotonic;
|
||||
this.xceiverCount = xceiverCount;
|
||||
this.location = networkLocation;
|
||||
this.adminState = adminState;
|
||||
|
@ -223,9 +228,26 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
|||
return DFSUtil.getPercentRemaining(getCacheRemaining(), cacheCapacity);
|
||||
}
|
||||
|
||||
/** The time when this information was accurate. */
|
||||
/**
|
||||
* Get the last update timestamp.
|
||||
* Return value is suitable for Date conversion.
|
||||
*/
|
||||
public long getLastUpdate() { return lastUpdate; }
|
||||
|
||||
/**
|
||||
* The time when this information was accurate. <br>
|
||||
* Ps: So return value is ideal for calculation of time differences.
|
||||
* Should not be used to convert to Date.
|
||||
*/
|
||||
public long getLastUpdateMonotonic() { return lastUpdateMonotonic;}
|
||||
|
||||
/**
|
||||
* Set lastUpdate monotonic time
|
||||
*/
|
||||
public void setLastUpdateMonotonic(long lastUpdateMonotonic) {
|
||||
this.lastUpdateMonotonic = lastUpdateMonotonic;
|
||||
}
|
||||
|
||||
/** number of active connections */
|
||||
public int getXceiverCount() { return xceiverCount; }
|
||||
|
||||
|
@ -437,7 +459,7 @@ public class DatanodeInfo extends DatanodeID implements Node {
|
|||
* @return true if the node is stale
|
||||
*/
|
||||
public boolean isStale(long staleInterval) {
|
||||
return (Time.now() - lastUpdate) >= staleInterval;
|
||||
return (Time.monotonicNow() - lastUpdateMonotonic) >= staleInterval;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -642,8 +642,8 @@ public class PBHelper {
|
|||
di.hasLocation() ? di.getLocation() : null ,
|
||||
di.getCapacity(), di.getDfsUsed(), di.getRemaining(),
|
||||
di.getBlockPoolUsed(), di.getCacheCapacity(), di.getCacheUsed(),
|
||||
di.getLastUpdate(), di.getXceiverCount(),
|
||||
PBHelper.convert(di.getAdminState()));
|
||||
di.getLastUpdate(), di.getLastUpdateMonotonic(),
|
||||
di.getXceiverCount(), PBHelper.convert(di.getAdminState()));
|
||||
}
|
||||
|
||||
static public DatanodeInfoProto convertDatanodeInfo(DatanodeInfo di) {
|
||||
|
@ -704,6 +704,7 @@ public class PBHelper {
|
|||
.setCacheCapacity(info.getCacheCapacity())
|
||||
.setCacheUsed(info.getCacheUsed())
|
||||
.setLastUpdate(info.getLastUpdate())
|
||||
.setLastUpdateMonotonic(info.getLastUpdateMonotonic())
|
||||
.setXceiverCount(info.getXceiverCount())
|
||||
.setAdminState(PBHelper.convert(info.getAdminState()))
|
||||
.build();
|
||||
|
|
|
@ -672,7 +672,7 @@ public class Balancer {
|
|||
*/
|
||||
@Override
|
||||
public int run(String[] args) {
|
||||
final long startTime = Time.now();
|
||||
final long startTime = Time.monotonicNow();
|
||||
final Configuration conf = getConf();
|
||||
|
||||
try {
|
||||
|
@ -687,8 +687,10 @@ public class Balancer {
|
|||
System.out.println(e + ". Exiting ...");
|
||||
return ExitStatus.INTERRUPTED.getExitCode();
|
||||
} finally {
|
||||
System.out.format("%-24s ", DateFormat.getDateTimeInstance().format(new Date()));
|
||||
System.out.println("Balancing took " + time2Str(Time.now()-startTime));
|
||||
System.out.format("%-24s ",
|
||||
DateFormat.getDateTimeInstance().format(new Date()));
|
||||
System.out.println("Balancing took "
|
||||
+ time2Str(Time.monotonicNow() - startTime));
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -315,7 +315,8 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous {
|
|||
continue;
|
||||
}
|
||||
final ReplicaUnderConstruction ruc = replicas.get(i);
|
||||
final long lastUpdate = ruc.getExpectedStorageLocation().getDatanodeDescriptor().getLastUpdate();
|
||||
final long lastUpdate = ruc.getExpectedStorageLocation()
|
||||
.getDatanodeDescriptor().getLastUpdateMonotonic();
|
||||
if (lastUpdate > mostRecentLastUpdate) {
|
||||
primaryNodeIndex = i;
|
||||
primary = ruc;
|
||||
|
|
|
@ -1772,7 +1772,7 @@ public class BlockManager {
|
|||
final DatanodeStorage storage,
|
||||
final BlockListAsLongs newReport) throws IOException {
|
||||
namesystem.writeLock();
|
||||
final long startTime = Time.now(); //after acquiring write lock
|
||||
final long startTime = Time.monotonicNow(); //after acquiring write lock
|
||||
final long endTime;
|
||||
DatanodeDescriptor node;
|
||||
Collection<Block> invalidatedBlocks = null;
|
||||
|
@ -1810,7 +1810,7 @@ public class BlockManager {
|
|||
|
||||
storageInfo.receivedBlockReport();
|
||||
} finally {
|
||||
endTime = Time.now();
|
||||
endTime = Time.monotonicNow();
|
||||
namesystem.writeUnlock();
|
||||
}
|
||||
|
||||
|
@ -1840,7 +1840,7 @@ public class BlockManager {
|
|||
if (getPostponedMisreplicatedBlocksCount() == 0) {
|
||||
return;
|
||||
}
|
||||
long startTimeRescanPostponedMisReplicatedBlocks = Time.now();
|
||||
long startTimeRescanPostponedMisReplicatedBlocks = Time.monotonicNow();
|
||||
long startPostponedMisReplicatedBlocksCount =
|
||||
getPostponedMisreplicatedBlocksCount();
|
||||
namesystem.writeLock();
|
||||
|
@ -1900,7 +1900,7 @@ public class BlockManager {
|
|||
long endPostponedMisReplicatedBlocksCount =
|
||||
getPostponedMisreplicatedBlocksCount();
|
||||
LOG.info("Rescan of postponedMisreplicatedBlocks completed in " +
|
||||
(Time.now() - startTimeRescanPostponedMisReplicatedBlocks) +
|
||||
(Time.monotonicNow() - startTimeRescanPostponedMisReplicatedBlocks) +
|
||||
" msecs. " + endPostponedMisReplicatedBlocksCount +
|
||||
" blocks are left. " + (startPostponedMisReplicatedBlocksCount -
|
||||
endPostponedMisReplicatedBlocksCount) + " blocks are removed.");
|
||||
|
@ -2648,7 +2648,7 @@ public class BlockManager {
|
|||
private void processMisReplicatesAsync() throws InterruptedException {
|
||||
long nrInvalid = 0, nrOverReplicated = 0;
|
||||
long nrUnderReplicated = 0, nrPostponed = 0, nrUnderConstruction = 0;
|
||||
long startTimeMisReplicatedScan = Time.now();
|
||||
long startTimeMisReplicatedScan = Time.monotonicNow();
|
||||
Iterator<BlockInfoContiguous> blocksItr = blocksMap.getBlocks().iterator();
|
||||
long totalBlocks = blocksMap.size();
|
||||
replicationQueuesInitProgress = 0;
|
||||
|
@ -2706,7 +2706,8 @@ public class BlockManager {
|
|||
NameNode.stateChangeLog
|
||||
.info("STATE* Replication Queue initialization "
|
||||
+ "scan for invalid, over- and under-replicated blocks "
|
||||
+ "completed in " + (Time.now() - startTimeMisReplicatedScan)
|
||||
+ "completed in "
|
||||
+ (Time.monotonicNow() - startTimeMisReplicatedScan)
|
||||
+ " msec");
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import java.util.*;
|
||||
|
||||
|
@ -884,7 +884,7 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
Collection<DatanodeStorageInfo> second,
|
||||
final List<StorageType> excessTypes) {
|
||||
long oldestHeartbeat =
|
||||
now() - heartbeatInterval * tolerateHeartbeatMultiplier;
|
||||
monotonicNow() - heartbeatInterval * tolerateHeartbeatMultiplier;
|
||||
DatanodeStorageInfo oldestHeartbeatStorage = null;
|
||||
long minSpace = Long.MAX_VALUE;
|
||||
DatanodeStorageInfo minSpaceStorage = null;
|
||||
|
@ -898,8 +898,8 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy {
|
|||
|
||||
final DatanodeDescriptor node = storage.getDatanodeDescriptor();
|
||||
long free = node.getRemaining();
|
||||
long lastHeartbeat = node.getLastUpdate();
|
||||
if(lastHeartbeat < oldestHeartbeat) {
|
||||
long lastHeartbeat = node.getLastUpdateMonotonic();
|
||||
if (lastHeartbeat < oldestHeartbeat) {
|
||||
oldestHeartbeat = lastHeartbeat;
|
||||
oldestHeartbeatStorage = storage;
|
||||
}
|
||||
|
|
|
@ -405,7 +405,8 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
setCacheCapacity(cacheCapacity);
|
||||
setCacheUsed(cacheUsed);
|
||||
setXceiverCount(xceiverCount);
|
||||
setLastUpdate(Time.now());
|
||||
setLastUpdate(Time.now());
|
||||
setLastUpdateMonotonic(Time.monotonicNow());
|
||||
this.volumeFailures = volFailures;
|
||||
this.volumeFailureSummary = volumeFailureSummary;
|
||||
for (StorageReport report : reports) {
|
||||
|
@ -420,7 +421,7 @@ public class DatanodeDescriptor extends DatanodeInfo {
|
|||
totalBlockPoolUsed += report.getBlockPoolUsed();
|
||||
totalDfsUsed += report.getDfsUsed();
|
||||
}
|
||||
rollBlocksScheduled(getLastUpdate());
|
||||
rollBlocksScheduled(getLastUpdateMonotonic());
|
||||
|
||||
// Update total metrics for the node.
|
||||
setCapacity(totalCapacity);
|
||||
|
|
|
@ -17,6 +17,8 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.net.InetAddresses;
|
||||
|
@ -43,7 +45,6 @@ import org.apache.hadoop.ipc.Server;
|
|||
import org.apache.hadoop.net.*;
|
||||
import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
|
@ -581,8 +582,8 @@ public class DatanodeManager {
|
|||
|
||||
/** Is the datanode dead? */
|
||||
boolean isDatanodeDead(DatanodeDescriptor node) {
|
||||
return (node.getLastUpdate() <
|
||||
(Time.now() - heartbeatExpireInterval));
|
||||
return (node.getLastUpdateMonotonic() <
|
||||
(monotonicNow() - heartbeatExpireInterval));
|
||||
}
|
||||
|
||||
/** Add a datanode. */
|
||||
|
@ -1299,7 +1300,7 @@ public class DatanodeManager {
|
|||
.getAddress().getHostAddress(), addr.getHostName(), "",
|
||||
addr.getPort() == 0 ? defaultXferPort : addr.getPort(),
|
||||
defaultInfoPort, defaultInfoSecurePort, defaultIpcPort));
|
||||
dn.setLastUpdate(0); // Consider this node dead for reporting
|
||||
setDatanodeDead(dn);
|
||||
nodes.add(dn);
|
||||
}
|
||||
}
|
||||
|
@ -1332,6 +1333,7 @@ public class DatanodeManager {
|
|||
|
||||
private void setDatanodeDead(DatanodeDescriptor node) {
|
||||
node.setLastUpdate(0);
|
||||
node.setLastUpdateMonotonic(0);
|
||||
}
|
||||
|
||||
/** Handle heartbeat from datanodes. */
|
||||
|
@ -1437,7 +1439,7 @@ public class DatanodeManager {
|
|||
blockPoolId, blks));
|
||||
}
|
||||
boolean sendingCachingCommands = false;
|
||||
long nowMs = Time.monotonicNow();
|
||||
long nowMs = monotonicNow();
|
||||
if (shouldSendCachingCommands &&
|
||||
((nowMs - nodeinfo.getLastCachingDirectiveSentTimeMs()) >=
|
||||
timeBetweenResendingCachingDirectivesMs)) {
|
||||
|
|
|
@ -45,7 +45,7 @@ import org.slf4j.Logger;
|
|||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import static com.google.common.base.Preconditions.checkArgument;
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
/**
|
||||
* Manages datanode decommissioning. A background monitor thread
|
||||
|
@ -208,7 +208,7 @@ public class DecommissionManager {
|
|||
}
|
||||
// Update DN stats maintained by HeartbeatManager
|
||||
hbManager.startDecommission(node);
|
||||
node.decommissioningStatus.setStartTime(now());
|
||||
node.decommissioningStatus.setStartTime(monotonicNow());
|
||||
pendingNodes.add(node);
|
||||
}
|
||||
} else {
|
||||
|
|
|
@ -353,7 +353,7 @@ class HeartbeatManager implements DatanodeStatistics {
|
|||
public void run() {
|
||||
while(namesystem.isRunning()) {
|
||||
try {
|
||||
final long now = Time.now();
|
||||
final long now = Time.monotonicNow();
|
||||
if (lastHeartbeatCheck + heartbeatRecheckInterval < now) {
|
||||
heartbeatCheck();
|
||||
lastHeartbeatCheck = now;
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import java.io.PrintWriter;
|
||||
import java.sql.Time;
|
||||
|
@ -177,7 +177,7 @@ class PendingReplicationBlocks {
|
|||
private final List<DatanodeDescriptor> targets;
|
||||
|
||||
PendingBlockInfo(DatanodeDescriptor[] targets) {
|
||||
this.timeStamp = now();
|
||||
this.timeStamp = monotonicNow();
|
||||
this.targets = targets == null ? new ArrayList<DatanodeDescriptor>()
|
||||
: new ArrayList<DatanodeDescriptor>(Arrays.asList(targets));
|
||||
}
|
||||
|
@ -187,7 +187,7 @@ class PendingReplicationBlocks {
|
|||
}
|
||||
|
||||
void setTimeStamp() {
|
||||
timeStamp = now();
|
||||
timeStamp = monotonicNow();
|
||||
}
|
||||
|
||||
void incrementReplicas(DatanodeDescriptor... newTargets) {
|
||||
|
@ -234,7 +234,7 @@ class PendingReplicationBlocks {
|
|||
synchronized (pendingReplications) {
|
||||
Iterator<Map.Entry<Block, PendingBlockInfo>> iter =
|
||||
pendingReplications.entrySet().iterator();
|
||||
long now = now();
|
||||
long now = monotonicNow();
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("PendingReplicationMonitor checking Q");
|
||||
}
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode;
|
||||
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.IOException;
|
||||
|
@ -57,7 +57,6 @@ import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
|||
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
|
||||
import org.apache.hadoop.io.IOUtils;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.apache.hadoop.util.VersionUtil;
|
||||
|
||||
|
@ -249,7 +248,7 @@ class BPServiceActor implements Runnable {
|
|||
*/
|
||||
void scheduleBlockReport(long delay) {
|
||||
if (delay > 0) { // send BR after random delay
|
||||
lastBlockReport = Time.now()
|
||||
lastBlockReport = monotonicNow()
|
||||
- ( dnConf.blockReportInterval - DFSUtil.getRandom().nextInt((int)(delay)));
|
||||
} else { // send at next heartbeat
|
||||
lastBlockReport = lastHeartbeat - dnConf.blockReportInterval;
|
||||
|
@ -291,14 +290,14 @@ class BPServiceActor implements Runnable {
|
|||
|
||||
// Send incremental block reports to the Namenode outside the lock
|
||||
boolean success = false;
|
||||
final long startTime = Time.monotonicNow();
|
||||
final long startTime = monotonicNow();
|
||||
try {
|
||||
bpNamenode.blockReceivedAndDeleted(bpRegistration,
|
||||
bpos.getBlockPoolId(),
|
||||
reports.toArray(new StorageReceivedDeletedBlocks[reports.size()]));
|
||||
success = true;
|
||||
} finally {
|
||||
dn.getMetrics().addIncrementalBlockReport(Time.monotonicNow()-startTime);
|
||||
dn.getMetrics().addIncrementalBlockReport(monotonicNow() - startTime);
|
||||
if (!success) {
|
||||
synchronized (pendingIncrementalBRperStorage) {
|
||||
for (StorageReceivedDeletedBlocks report : reports) {
|
||||
|
@ -442,7 +441,7 @@ class BPServiceActor implements Runnable {
|
|||
*/
|
||||
List<DatanodeCommand> blockReport() throws IOException {
|
||||
// send block report if timer has expired.
|
||||
final long startTime = now();
|
||||
final long startTime = monotonicNow();
|
||||
if (startTime - lastBlockReport <= dnConf.blockReportInterval) {
|
||||
return null;
|
||||
}
|
||||
|
@ -456,7 +455,7 @@ class BPServiceActor implements Runnable {
|
|||
reportReceivedDeletedBlocks();
|
||||
lastDeletedReport = startTime;
|
||||
|
||||
long brCreateStartTime = now();
|
||||
long brCreateStartTime = monotonicNow();
|
||||
Map<DatanodeStorage, BlockListAsLongs> perVolumeBlockLists =
|
||||
dn.getFSDataset().getBlockReports(bpos.getBlockPoolId());
|
||||
|
||||
|
@ -476,7 +475,7 @@ class BPServiceActor implements Runnable {
|
|||
int numReportsSent = 0;
|
||||
int numRPCs = 0;
|
||||
boolean success = false;
|
||||
long brSendStartTime = now();
|
||||
long brSendStartTime = monotonicNow();
|
||||
try {
|
||||
if (totalBlockCount < dnConf.blockReportSplitThreshold) {
|
||||
// Below split threshold, send all reports in a single message.
|
||||
|
@ -503,7 +502,7 @@ class BPServiceActor implements Runnable {
|
|||
success = true;
|
||||
} finally {
|
||||
// Log the block report processing stats from Datanode perspective
|
||||
long brSendCost = now() - brSendStartTime;
|
||||
long brSendCost = monotonicNow() - brSendStartTime;
|
||||
long brCreateCost = brSendStartTime - brCreateStartTime;
|
||||
dn.getMetrics().addBlockReport(brSendCost);
|
||||
final int nCmds = cmds.size();
|
||||
|
@ -539,7 +538,7 @@ class BPServiceActor implements Runnable {
|
|||
* 1) normal like 9:20:18, next report should be at 10:20:14
|
||||
* 2) unexpected like 11:35:43, next report should be at 12:20:14
|
||||
*/
|
||||
lastBlockReport += (now() - lastBlockReport) /
|
||||
lastBlockReport += (monotonicNow() - lastBlockReport) /
|
||||
dnConf.blockReportInterval * dnConf.blockReportInterval;
|
||||
}
|
||||
}
|
||||
|
@ -551,7 +550,7 @@ class BPServiceActor implements Runnable {
|
|||
}
|
||||
// send cache report if timer has expired.
|
||||
DatanodeCommand cmd = null;
|
||||
final long startTime = Time.monotonicNow();
|
||||
final long startTime = monotonicNow();
|
||||
if (startTime - lastCacheReport > dnConf.cacheReportInterval) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Sending cacheReport from service actor: " + this);
|
||||
|
@ -560,10 +559,10 @@ class BPServiceActor implements Runnable {
|
|||
|
||||
String bpid = bpos.getBlockPoolId();
|
||||
List<Long> blockIds = dn.getFSDataset().getCacheReport(bpid);
|
||||
long createTime = Time.monotonicNow();
|
||||
long createTime = monotonicNow();
|
||||
|
||||
cmd = bpNamenode.cacheReport(bpRegistration, bpid, blockIds);
|
||||
long sendTime = Time.monotonicNow();
|
||||
long sendTime = monotonicNow();
|
||||
long createCost = createTime - startTime;
|
||||
long sendCost = sendTime - createTime;
|
||||
dn.getMetrics().addCacheReport(sendCost);
|
||||
|
@ -670,7 +669,7 @@ class BPServiceActor implements Runnable {
|
|||
//
|
||||
while (shouldRun()) {
|
||||
try {
|
||||
final long startTime = now();
|
||||
final long startTime = monotonicNow();
|
||||
|
||||
//
|
||||
// Every so often, send heartbeat or block-report
|
||||
|
@ -687,7 +686,7 @@ class BPServiceActor implements Runnable {
|
|||
if (!dn.areHeartbeatsDisabledForTests()) {
|
||||
HeartbeatResponse resp = sendHeartBeat();
|
||||
assert resp != null;
|
||||
dn.getMetrics().addHeartbeat(now() - startTime);
|
||||
dn.getMetrics().addHeartbeat(monotonicNow() - startTime);
|
||||
|
||||
// If the state of this NN has changed (eg STANDBY->ACTIVE)
|
||||
// then let the BPOfferService update itself.
|
||||
|
@ -703,10 +702,10 @@ class BPServiceActor implements Runnable {
|
|||
handleRollingUpgradeStatus(resp);
|
||||
}
|
||||
|
||||
long startProcessCommands = now();
|
||||
long startProcessCommands = monotonicNow();
|
||||
if (!processCommand(resp.getCommands()))
|
||||
continue;
|
||||
long endProcessCommands = now();
|
||||
long endProcessCommands = monotonicNow();
|
||||
if (endProcessCommands - startProcessCommands > 2000) {
|
||||
LOG.info("Took " + (endProcessCommands - startProcessCommands)
|
||||
+ "ms to process " + resp.getCommands().length
|
||||
|
@ -731,7 +730,7 @@ class BPServiceActor implements Runnable {
|
|||
// or work arrives, and then iterate again.
|
||||
//
|
||||
long waitTime = dnConf.heartBeatInterval -
|
||||
(Time.now() - lastHeartbeat);
|
||||
(monotonicNow() - lastHeartbeat);
|
||||
synchronized(pendingIncrementalBRperStorage) {
|
||||
if (waitTime > 0 && !sendImmediateIBR) {
|
||||
try {
|
||||
|
|
|
@ -25,7 +25,7 @@ import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status.SU
|
|||
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.USE_RECEIPT_VERIFICATION;
|
||||
import static org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.ShortCircuitFdResponse.DO_NOT_USE_RECEIPT_VERIFICATION;
|
||||
import static org.apache.hadoop.hdfs.server.datanode.DataNode.DN_CLIENTTRACE_FORMAT;
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import java.io.BufferedInputStream;
|
||||
import java.io.BufferedOutputStream;
|
||||
|
@ -247,7 +247,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
peer.setReadTimeout(dnConf.socketTimeout);
|
||||
}
|
||||
|
||||
opStartTime = now();
|
||||
opStartTime = monotonicNow();
|
||||
processOp(op);
|
||||
++opsProcessed;
|
||||
} while ((peer != null) &&
|
||||
|
@ -1202,7 +1202,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
}
|
||||
|
||||
private long elapsed() {
|
||||
return now() - opStartTime;
|
||||
return monotonicNow() - opStartTime;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_BACKUP_HTTP_ADDRESS_KEY;
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
@ -135,11 +135,11 @@ class Checkpointer extends Daemon {
|
|||
|
||||
long lastCheckpointTime = 0;
|
||||
if (!backupNode.shouldCheckpointAtStartup()) {
|
||||
lastCheckpointTime = now();
|
||||
lastCheckpointTime = monotonicNow();
|
||||
}
|
||||
while(shouldRun) {
|
||||
try {
|
||||
long now = now();
|
||||
long now = monotonicNow();
|
||||
boolean shouldCheckpoint = false;
|
||||
if(now >= lastCheckpointTime + periodMSec) {
|
||||
shouldCheckpoint = true;
|
||||
|
@ -182,7 +182,7 @@ class Checkpointer extends Daemon {
|
|||
BackupImage bnImage = getFSImage();
|
||||
NNStorage bnStorage = bnImage.getStorage();
|
||||
|
||||
long startTime = now();
|
||||
long startTime = monotonicNow();
|
||||
bnImage.freezeNamespaceAtNextRoll();
|
||||
|
||||
NamenodeCommand cmd =
|
||||
|
@ -276,7 +276,7 @@ class Checkpointer extends Daemon {
|
|||
|
||||
long imageSize = bnImage.getStorage().getFsImageName(txid).length();
|
||||
LOG.info("Checkpoint completed in "
|
||||
+ (now() - startTime)/1000 + " seconds."
|
||||
+ (monotonicNow() - startTime)/1000 + " seconds."
|
||||
+ " New Image Size: " + imageSize);
|
||||
}
|
||||
|
||||
|
|
|
@ -20,7 +20,7 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
import java.io.IOException;
|
||||
import java.io.Closeable;
|
||||
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
@ -109,9 +109,9 @@ public abstract class EditLogOutputStream implements Closeable {
|
|||
|
||||
public void flush(boolean durable) throws IOException {
|
||||
numSync++;
|
||||
long start = now();
|
||||
long start = monotonicNow();
|
||||
flushAndSync(durable);
|
||||
long end = now();
|
||||
long end = monotonicNow();
|
||||
totalTimeSync += (end - start);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,7 +18,7 @@
|
|||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.Constructor;
|
||||
|
@ -230,7 +230,7 @@ public class FSEditLog implements LogsPurgeable {
|
|||
this.conf = conf;
|
||||
this.storage = storage;
|
||||
metrics = NameNode.getNameNodeMetrics();
|
||||
lastPrintTime = now();
|
||||
lastPrintTime = monotonicNow();
|
||||
|
||||
// If this list is empty, an error will be thrown on first use
|
||||
// of the editlog, as no journals will exist
|
||||
|
@ -486,14 +486,14 @@ public class FSEditLog implements LogsPurgeable {
|
|||
//
|
||||
TransactionId id = myTransactionId.get();
|
||||
id.txid = txid;
|
||||
return now();
|
||||
return monotonicNow();
|
||||
}
|
||||
|
||||
private void endTransaction(long start) {
|
||||
assert Thread.holdsLock(this);
|
||||
|
||||
// update statistics
|
||||
long end = now();
|
||||
long end = monotonicNow();
|
||||
numTransactions++;
|
||||
totalTimeTransactions += (end-start);
|
||||
if (metrics != null) // Metrics is non-null only when used inside name node
|
||||
|
@ -640,7 +640,7 @@ public class FSEditLog implements LogsPurgeable {
|
|||
}
|
||||
|
||||
// do the sync
|
||||
long start = now();
|
||||
long start = monotonicNow();
|
||||
try {
|
||||
if (logStream != null) {
|
||||
logStream.flush();
|
||||
|
@ -657,7 +657,7 @@ public class FSEditLog implements LogsPurgeable {
|
|||
terminate(1, msg);
|
||||
}
|
||||
}
|
||||
long elapsed = now() - start;
|
||||
long elapsed = monotonicNow() - start;
|
||||
|
||||
if (metrics != null) { // Metrics non-null only when used inside name node
|
||||
metrics.addSync(elapsed);
|
||||
|
@ -679,7 +679,7 @@ public class FSEditLog implements LogsPurgeable {
|
|||
// print statistics every 1 minute.
|
||||
//
|
||||
private void printStatistics(boolean force) {
|
||||
long now = now();
|
||||
long now = monotonicNow();
|
||||
if (lastPrintTime + 60000 > now && !force) {
|
||||
return;
|
||||
}
|
||||
|
|
|
@ -19,7 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
|
|||
|
||||
import static org.apache.hadoop.hdfs.server.namenode.FSEditLogOp.TruncateOp;
|
||||
import static org.apache.hadoop.hdfs.server.namenode.FSImageFormat.renameReservedPathsOnUpgrade;
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import java.io.FilterInputStream;
|
||||
import java.io.IOException;
|
||||
|
@ -136,13 +136,13 @@ public class FSEditLogLoader {
|
|||
prog.beginStep(Phase.LOADING_EDITS, step);
|
||||
fsNamesys.writeLock();
|
||||
try {
|
||||
long startTime = now();
|
||||
long startTime = monotonicNow();
|
||||
FSImage.LOG.info("Start loading edits file " + edits.getName());
|
||||
long numEdits = loadEditRecords(edits, false, expectedStartingTxId,
|
||||
startOpt, recovery);
|
||||
FSImage.LOG.info("Edits file " + edits.getName()
|
||||
+ " of size " + edits.length() + " edits # " + numEdits
|
||||
+ " loaded in " + (now()-startTime)/1000 + " seconds");
|
||||
+ " loaded in " + (monotonicNow()-startTime)/1000 + " seconds");
|
||||
return numEdits;
|
||||
} finally {
|
||||
edits.close();
|
||||
|
@ -177,7 +177,7 @@ public class FSEditLogLoader {
|
|||
Step step = createStartupProgressStep(in);
|
||||
prog.setTotal(Phase.LOADING_EDITS, step, numTxns);
|
||||
Counter counter = prog.getCounter(Phase.LOADING_EDITS, step);
|
||||
long lastLogTime = now();
|
||||
long lastLogTime = monotonicNow();
|
||||
long lastInodeId = fsNamesys.dir.getLastInodeId();
|
||||
|
||||
try {
|
||||
|
@ -257,7 +257,7 @@ public class FSEditLogLoader {
|
|||
}
|
||||
// log progress
|
||||
if (op.hasTransactionId()) {
|
||||
long now = now();
|
||||
long now = monotonicNow();
|
||||
if (now - lastLogTime > REPLAY_TRANSACTION_LOG_INTERVAL) {
|
||||
long deltaTxId = lastAppliedTxId - expectedStartingTxId + 1;
|
||||
int percent = Math.round((float) deltaTxId / numTxns * 100);
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataInputStream;
|
||||
|
@ -309,7 +309,7 @@ public class FSImageFormat {
|
|||
StartupProgress prog = NameNode.getStartupProgress();
|
||||
Step step = new Step(StepType.INODES);
|
||||
prog.beginStep(Phase.LOADING_FSIMAGE, step);
|
||||
long startTime = now();
|
||||
long startTime = monotonicNow();
|
||||
|
||||
//
|
||||
// Load in bits
|
||||
|
@ -441,8 +441,9 @@ public class FSImageFormat {
|
|||
imgDigest = new MD5Hash(digester.digest());
|
||||
loaded = true;
|
||||
|
||||
LOG.info("Image file " + curFile + " of size " + curFile.length() +
|
||||
" bytes loaded in " + (now() - startTime)/1000 + " seconds.");
|
||||
LOG.info("Image file " + curFile + " of size " + curFile.length()
|
||||
+ " bytes loaded in " + (monotonicNow() - startTime) / 1000
|
||||
+ " seconds.");
|
||||
}
|
||||
|
||||
/** Update the root node's attributes */
|
||||
|
@ -1240,7 +1241,7 @@ public class FSImageFormat {
|
|||
prog.beginStep(Phase.SAVING_CHECKPOINT, step);
|
||||
prog.setTotal(Phase.SAVING_CHECKPOINT, step, numINodes);
|
||||
Counter counter = prog.getCounter(Phase.SAVING_CHECKPOINT, step);
|
||||
long startTime = now();
|
||||
long startTime = monotonicNow();
|
||||
//
|
||||
// Write out data
|
||||
//
|
||||
|
@ -1308,8 +1309,9 @@ public class FSImageFormat {
|
|||
// set md5 of the saved image
|
||||
savedDigest = new MD5Hash(digester.digest());
|
||||
|
||||
LOG.info("Image file " + newFile + " of size " + newFile.length() +
|
||||
" bytes saved in " + (now() - startTime)/1000 + " seconds.");
|
||||
LOG.info("Image file " + newFile + " of size " + newFile.length()
|
||||
+ " bytes saved in " + (monotonicNow() - startTime) / 1000
|
||||
+ " seconds.");
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -88,6 +88,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
|
|||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
|
||||
import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.SECURITY_XATTR_UNREADABLE_BY_SUPERUSER;
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import java.io.BufferedWriter;
|
||||
import java.io.ByteArrayInputStream;
|
||||
|
@ -277,7 +278,6 @@ import org.apache.hadoop.util.ChunkedArrayList;
|
|||
import org.apache.hadoop.util.Daemon;
|
||||
import org.apache.hadoop.util.DataChecksum;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.apache.log4j.Appender;
|
||||
import org.apache.log4j.AsyncAppender;
|
||||
|
@ -683,7 +683,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
namesystem.setSafeMode(SafeModeAction.SAFEMODE_ENTER);
|
||||
}
|
||||
|
||||
long loadStart = now();
|
||||
long loadStart = monotonicNow();
|
||||
try {
|
||||
namesystem.loadFSImage(startOpt);
|
||||
} catch (IOException ioe) {
|
||||
|
@ -691,7 +691,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
fsImage.close();
|
||||
throw ioe;
|
||||
}
|
||||
long timeTakenToLoadFSImage = now() - loadStart;
|
||||
long timeTakenToLoadFSImage = monotonicNow() - loadStart;
|
||||
LOG.info("Finished loading FSImage in " + timeTakenToLoadFSImage + " msecs");
|
||||
NameNodeMetrics nnMetrics = NameNode.getNameNodeMetrics();
|
||||
if (nnMetrics != null) {
|
||||
|
@ -5071,6 +5071,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
* <br> >0 safe mode is on, but we are in extension period
|
||||
*/
|
||||
private long reached = -1;
|
||||
private long reachedTimestamp = -1;
|
||||
/** Total number of blocks. */
|
||||
int blockTotal;
|
||||
/** Number of safe blocks. */
|
||||
|
@ -5171,6 +5172,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
*/
|
||||
private void enter() {
|
||||
this.reached = 0;
|
||||
this.reachedTimestamp = 0;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -5194,6 +5196,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
NameNode.stateChangeLog.info("STATE* Safe mode is OFF");
|
||||
}
|
||||
reached = -1;
|
||||
reachedTimestamp = -1;
|
||||
safeMode = null;
|
||||
final NetworkTopology nt = blockManager.getDatanodeManager().getNetworkTopology();
|
||||
NameNode.stateChangeLog.info("STATE* Network topology has "
|
||||
|
@ -5232,7 +5235,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return false;
|
||||
}
|
||||
|
||||
if (now() - reached < extension) {
|
||||
if (monotonicNow() - reached < extension) {
|
||||
reportStatus("STATE* Safe mode ON, in safe mode extension.", false);
|
||||
return false;
|
||||
}
|
||||
|
@ -5288,7 +5291,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
return;
|
||||
}
|
||||
// start monitor
|
||||
reached = now();
|
||||
reached = monotonicNow();
|
||||
reachedTimestamp = now();
|
||||
if (smmthread == null) {
|
||||
smmthread = new Daemon(new SafeModeMonitor());
|
||||
smmthread.start();
|
||||
|
@ -5435,8 +5439,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
|
||||
if (!thresholdsMet) {
|
||||
msg += "once the thresholds have been reached.";
|
||||
} else if (reached + extension - now() > 0) {
|
||||
msg += ("in " + (reached + extension - now()) / 1000 + " seconds.");
|
||||
} else if (reached + extension - monotonicNow() > 0) {
|
||||
msg += ("in " + (reached + extension - monotonicNow()) / 1000 + " seconds.");
|
||||
} else {
|
||||
msg += "soon.";
|
||||
}
|
||||
|
@ -5462,7 +5466,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
+ ". Target blocks = " + blockThreshold + " for threshold = %" + threshold
|
||||
+ ". Minimal replication = " + safeReplication + ".";
|
||||
if (reached > 0)
|
||||
resText += " Threshold was reached " + new Date(reached) + ".";
|
||||
resText += " Threshold was reached " + new Date(reachedTimestamp) + ".";
|
||||
return resText;
|
||||
}
|
||||
|
||||
|
@ -5941,7 +5945,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
@Metric
|
||||
public long getMillisSinceLastLoadedEdits() {
|
||||
if (isInStandbyState() && editLogTailer != null) {
|
||||
return now() - editLogTailer.getLastLoadTimestamp();
|
||||
return monotonicNow() - editLogTailer.getLastLoadTimeMs();
|
||||
} else {
|
||||
return 0;
|
||||
}
|
||||
|
@ -6983,7 +6987,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
}
|
||||
|
||||
private long getLastContact(DatanodeDescriptor alivenode) {
|
||||
return (Time.now() - alivenode.getLastUpdate())/1000;
|
||||
return (monotonicNow() - alivenode.getLastUpdateMonotonic())/1000;
|
||||
}
|
||||
|
||||
private long getDfsUsed(DatanodeDescriptor alivenode) {
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode;
|
||||
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
|
@ -256,17 +256,17 @@ public class LeaseManager {
|
|||
}
|
||||
/** Only LeaseManager object can renew a lease */
|
||||
private void renew() {
|
||||
this.lastUpdate = now();
|
||||
this.lastUpdate = monotonicNow();
|
||||
}
|
||||
|
||||
/** @return true if the Hard Limit Timer has expired */
|
||||
public boolean expiredHardLimit() {
|
||||
return now() - lastUpdate > hardLimit;
|
||||
return monotonicNow() - lastUpdate > hardLimit;
|
||||
}
|
||||
|
||||
/** @return true if the Soft Limit Timer has expired */
|
||||
public boolean expiredSoftLimit() {
|
||||
return now() - lastUpdate > softLimit;
|
||||
return monotonicNow() - lastUpdate > softLimit;
|
||||
}
|
||||
|
||||
/** Does this lease contain any path? */
|
||||
|
|
|
@ -289,7 +289,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
|||
* Check files on DFS, starting from the indicated path.
|
||||
*/
|
||||
public void fsck() {
|
||||
final long startTime = Time.now();
|
||||
final long startTime = Time.monotonicNow();
|
||||
try {
|
||||
if(blockIds != null) {
|
||||
|
||||
|
@ -357,7 +357,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
|||
}
|
||||
|
||||
out.println("FSCK ended at " + new Date() + " in "
|
||||
+ (Time.now() - startTime + " milliseconds"));
|
||||
+ (Time.monotonicNow() - startTime + " milliseconds"));
|
||||
|
||||
// If there were internal errors during the fsck operation, we want to
|
||||
// return FAILURE_STATUS, even if those errors were not immediately
|
||||
|
@ -383,7 +383,7 @@ public class NamenodeFsck implements DataEncryptionKeyFactory {
|
|||
String errMsg = "Fsck on path '" + path + "' " + FAILURE_STATUS;
|
||||
LOG.warn(errMsg, e);
|
||||
out.println("FSCK ended at " + new Date() + " in "
|
||||
+ (Time.now() - startTime + " milliseconds"));
|
||||
+ (Time.monotonicNow() - startTime + " milliseconds"));
|
||||
out.println(e.getMessage());
|
||||
out.print("\n\n" + errMsg);
|
||||
} finally {
|
||||
|
|
|
@ -44,7 +44,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
|||
import org.apache.hadoop.ipc.RPC;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
import static org.apache.hadoop.util.ExitUtil.terminate;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
@ -84,7 +84,7 @@ public class EditLogTailer {
|
|||
* The last time we successfully loaded a non-zero number of edits from the
|
||||
* shared directory.
|
||||
*/
|
||||
private long lastLoadTimestamp;
|
||||
private long lastLoadTimeMs;
|
||||
|
||||
/**
|
||||
* How often the Standby should roll edit logs. Since the Standby only reads
|
||||
|
@ -105,7 +105,7 @@ public class EditLogTailer {
|
|||
this.namesystem = namesystem;
|
||||
this.editLog = namesystem.getEditLog();
|
||||
|
||||
lastLoadTimestamp = now();
|
||||
lastLoadTimeMs = monotonicNow();
|
||||
|
||||
logRollPeriodMs = conf.getInt(DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_KEY,
|
||||
DFSConfigKeys.DFS_HA_LOGROLL_PERIOD_DEFAULT) * 1000;
|
||||
|
@ -241,7 +241,7 @@ public class EditLogTailer {
|
|||
}
|
||||
|
||||
if (editsLoaded > 0) {
|
||||
lastLoadTimestamp = now();
|
||||
lastLoadTimeMs = monotonicNow();
|
||||
}
|
||||
lastLoadedTxnId = image.getLastAppliedTxId();
|
||||
} finally {
|
||||
|
@ -250,10 +250,10 @@ public class EditLogTailer {
|
|||
}
|
||||
|
||||
/**
|
||||
* @return timestamp (in msec) of when we last loaded a non-zero number of edits.
|
||||
* @return time in msec of when we last loaded a non-zero number of edits.
|
||||
*/
|
||||
public long getLastLoadTimestamp() {
|
||||
return lastLoadTimestamp;
|
||||
public long getLastLoadTimeMs() {
|
||||
return lastLoadTimeMs;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -261,7 +261,7 @@ public class EditLogTailer {
|
|||
*/
|
||||
private boolean tooLongSinceLastLoad() {
|
||||
return logRollPeriodMs >= 0 &&
|
||||
(now() - lastLoadTimestamp) > logRollPeriodMs ;
|
||||
(monotonicNow() - lastLoadTimeMs) > logRollPeriodMs ;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -284,6 +284,7 @@ public class JsonUtil {
|
|||
m.put("cacheCapacity", datanodeinfo.getCacheCapacity());
|
||||
m.put("cacheUsed", datanodeinfo.getCacheUsed());
|
||||
m.put("lastUpdate", datanodeinfo.getLastUpdate());
|
||||
m.put("lastUpdateMonotonic", datanodeinfo.getLastUpdateMonotonic());
|
||||
m.put("xceiverCount", datanodeinfo.getXceiverCount());
|
||||
m.put("networkLocation", datanodeinfo.getNetworkLocation());
|
||||
m.put("adminState", datanodeinfo.getAdminState().name());
|
||||
|
@ -379,6 +380,7 @@ public class JsonUtil {
|
|||
getLong(m, "cacheCapacity", 0l),
|
||||
getLong(m, "cacheUsed", 0l),
|
||||
getLong(m, "lastUpdate", 0l),
|
||||
getLong(m, "lastUpdateMonotonic", 0l),
|
||||
getInt(m, "xceiverCount", 0),
|
||||
getString(m, "networkLocation", ""),
|
||||
AdminStates.valueOf(getString(m, "adminState", "NORMAL")));
|
||||
|
|
|
@ -97,6 +97,7 @@ message DatanodeInfoProto {
|
|||
optional AdminState adminState = 10 [default = NORMAL];
|
||||
optional uint64 cacheCapacity = 11 [default = 0];
|
||||
optional uint64 cacheUsed = 12 [default = 0];
|
||||
optional uint64 lastUpdateMonotonic = 13 [default = 0];
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
|
||||
package org.apache.hadoop.hdfs;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Charsets;
|
||||
import com.google.common.base.Joiner;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
@ -89,6 +90,7 @@ import org.apache.hadoop.security.UserGroupInformation;
|
|||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.VersionInfo;
|
||||
import org.apache.log4j.Level;
|
||||
|
@ -1019,7 +1021,7 @@ public class DFSTestUtil {
|
|||
DFSConfigKeys.DFS_DATANODE_HTTP_DEFAULT_PORT,
|
||||
DFSConfigKeys.DFS_DATANODE_HTTPS_DEFAULT_PORT,
|
||||
DFSConfigKeys.DFS_DATANODE_IPC_DEFAULT_PORT,
|
||||
1l, 2l, 3l, 4l, 0l, 0l, 5, 6, "local", adminState);
|
||||
1l, 2l, 3l, 4l, 0l, 0l, 0l, 5, 6, "local", adminState);
|
||||
}
|
||||
|
||||
public static DatanodeDescriptor getDatanodeDescriptor(String ipAddr,
|
||||
|
@ -1571,9 +1573,11 @@ public class DFSTestUtil {
|
|||
// the one to be in charge of the synchronization / recovery protocol.
|
||||
final DatanodeStorageInfo[] storages = ucBlock.getExpectedStorageLocations();
|
||||
DatanodeStorageInfo expectedPrimary = storages[0];
|
||||
long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor().getLastUpdate();
|
||||
long mostRecentLastUpdate = expectedPrimary.getDatanodeDescriptor()
|
||||
.getLastUpdateMonotonic();
|
||||
for (int i = 1; i < storages.length; i++) {
|
||||
final long lastUpdate = storages[i].getDatanodeDescriptor().getLastUpdate();
|
||||
final long lastUpdate = storages[i].getDatanodeDescriptor()
|
||||
.getLastUpdateMonotonic();
|
||||
if (lastUpdate > mostRecentLastUpdate) {
|
||||
expectedPrimary = storages[i];
|
||||
mostRecentLastUpdate = lastUpdate;
|
||||
|
@ -1710,4 +1714,21 @@ public class DFSTestUtil {
|
|||
GenericTestUtils.setLogLevel(NameNode.stateChangeLog, level);
|
||||
GenericTestUtils.setLogLevel(NameNode.blockStateChangeLog, level);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the datanode dead
|
||||
*/
|
||||
public static void setDatanodeDead(DatanodeInfo dn) {
|
||||
dn.setLastUpdate(0);
|
||||
dn.setLastUpdateMonotonic(0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Update lastUpdate and lastUpdateMonotonic with some offset.
|
||||
*/
|
||||
public static void resetLastUpdatesWithOffset(DatanodeInfo dn, long offset) {
|
||||
dn.setLastUpdate(Time.now() + offset);
|
||||
dn.setLastUpdateMonotonic(Time.monotonicNow() + offset);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -2076,7 +2076,7 @@ public class MiniDFSCluster {
|
|||
public void setDataNodeDead(DatanodeID dnId) throws IOException {
|
||||
DatanodeDescriptor dnd =
|
||||
NameNodeAdapter.getDatanode(getNamesystem(), dnId);
|
||||
dnd.setLastUpdate(0L);
|
||||
DFSTestUtil.setDatanodeDead(dnd);
|
||||
BlockManagerTestUtil.checkHeartbeat(getNamesystem().getBlockManager());
|
||||
}
|
||||
|
||||
|
|
|
@ -132,7 +132,8 @@ public class TestGetBlocks {
|
|||
staleNodeInfo = cluster.getNameNode().getNamesystem().getBlockManager()
|
||||
.getDatanodeManager()
|
||||
.getDatanode(staleNode.getDatanodeId());
|
||||
staleNodeInfo.setLastUpdate(Time.now() - staleInterval - 1);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(staleNodeInfo,
|
||||
-(staleInterval + 1));
|
||||
|
||||
LocatedBlocks blocksAfterStale = client.getNamenode().getBlockLocations(
|
||||
fileName.toString(), 0, blockSize);
|
||||
|
@ -143,8 +144,7 @@ public class TestGetBlocks {
|
|||
// restart the staleNode's heartbeat
|
||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(staleNode, false);
|
||||
// reset the first node as non-stale, so as to avoid two stale nodes
|
||||
staleNodeInfo.setLastUpdate(Time.now());
|
||||
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(staleNodeInfo, 0);
|
||||
LocatedBlock lastBlock = client.getLocatedBlocks(fileName.toString(), 0,
|
||||
Long.MAX_VALUE).getLastLocatedBlock();
|
||||
nodes = lastBlock.getLocations();
|
||||
|
@ -153,10 +153,10 @@ public class TestGetBlocks {
|
|||
staleNode = this.stopDataNodeHeartbeat(cluster, nodes[0].getHostName());
|
||||
assertNotNull(staleNode);
|
||||
// set the node as stale
|
||||
cluster.getNameNode().getNamesystem().getBlockManager()
|
||||
.getDatanodeManager()
|
||||
.getDatanode(staleNode.getDatanodeId())
|
||||
.setLastUpdate(Time.now() - staleInterval - 1);
|
||||
DatanodeDescriptor dnDesc = cluster.getNameNode().getNamesystem()
|
||||
.getBlockManager().getDatanodeManager()
|
||||
.getDatanode(staleNode.getDatanodeId());
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dnDesc, -(staleInterval + 1));
|
||||
|
||||
LocatedBlock lastBlockAfterStale = client.getLocatedBlocks(
|
||||
fileName.toString(), 0, Long.MAX_VALUE).getLastLocatedBlock();
|
||||
|
|
|
@ -59,7 +59,7 @@ public class TestInjectionForSimulatedStorage {
|
|||
ClientProtocol namenode,
|
||||
int expected, long maxWaitSec)
|
||||
throws IOException {
|
||||
long start = Time.now();
|
||||
long start = Time.monotonicNow();
|
||||
|
||||
//wait for all the blocks to be replicated;
|
||||
LOG.info("Checking for block replication for " + filename);
|
||||
|
@ -84,7 +84,7 @@ public class TestInjectionForSimulatedStorage {
|
|||
actual + ".");
|
||||
|
||||
if (maxWaitSec > 0 &&
|
||||
(Time.now() - start) > (maxWaitSec * 1000)) {
|
||||
(Time.monotonicNow() - start) > (maxWaitSec * 1000)) {
|
||||
throw new IOException("Timedout while waiting for all blocks to " +
|
||||
" be replicated for " + filename);
|
||||
}
|
||||
|
|
|
@ -101,7 +101,7 @@ public class TestLease {
|
|||
// call renewLease() manually.
|
||||
// make it look like the soft limit has been exceeded.
|
||||
LeaseRenewer originalRenewer = dfs.getLeaseRenewer();
|
||||
dfs.lastLeaseRenewal = Time.now()
|
||||
dfs.lastLeaseRenewal = Time.monotonicNow()
|
||||
- HdfsConstants.LEASE_SOFTLIMIT_PERIOD - 1000;
|
||||
try {
|
||||
dfs.renewLease();
|
||||
|
@ -117,7 +117,7 @@ public class TestLease {
|
|||
}
|
||||
|
||||
// make it look like the hard limit has been exceeded.
|
||||
dfs.lastLeaseRenewal = Time.now()
|
||||
dfs.lastLeaseRenewal = Time.monotonicNow()
|
||||
- HdfsConstants.LEASE_HARDLIMIT_PERIOD - 1000;
|
||||
dfs.renewLease();
|
||||
|
||||
|
|
|
@ -111,8 +111,8 @@ public class TestLeaseRenewer {
|
|||
renewer.put(fileId, mockStream, MOCK_DFSCLIENT);
|
||||
|
||||
// Wait for lease to get renewed
|
||||
long failTime = Time.now() + 5000;
|
||||
while (Time.now() < failTime &&
|
||||
long failTime = Time.monotonicNow() + 5000;
|
||||
while (Time.monotonicNow() < failTime &&
|
||||
leaseRenewalCount.get() == 0) {
|
||||
Thread.sleep(50);
|
||||
}
|
||||
|
@ -193,11 +193,11 @@ public class TestLeaseRenewer {
|
|||
|
||||
// Pretend to close the file
|
||||
renewer.closeFile(fileId, MOCK_DFSCLIENT);
|
||||
renewer.setEmptyTime(Time.now());
|
||||
renewer.setEmptyTime(Time.monotonicNow());
|
||||
|
||||
// Should stop the renewer running within a few seconds
|
||||
long failTime = Time.now() + 5000;
|
||||
while (renewer.isRunning() && Time.now() < failTime) {
|
||||
long failTime = Time.monotonicNow() + 5000;
|
||||
while (renewer.isRunning() && Time.monotonicNow() < failTime) {
|
||||
Thread.sleep(50);
|
||||
}
|
||||
Assert.assertFalse(renewer.isRunning());
|
||||
|
|
|
@ -333,7 +333,7 @@ public class TestParallelReadUtil {
|
|||
}
|
||||
|
||||
// Start the workers and wait
|
||||
long starttime = Time.now();
|
||||
long starttime = Time.monotonicNow();
|
||||
for (ReadWorker worker : workers) {
|
||||
worker.start();
|
||||
}
|
||||
|
@ -343,7 +343,7 @@ public class TestParallelReadUtil {
|
|||
worker.join();
|
||||
} catch (InterruptedException ignored) { }
|
||||
}
|
||||
long endtime = Time.now();
|
||||
long endtime = Time.monotonicNow();
|
||||
|
||||
// Cleanup
|
||||
for (TestFileInfo testInfo : testInfoArr) {
|
||||
|
|
|
@ -272,7 +272,7 @@ public class TestReplication {
|
|||
ClientProtocol namenode,
|
||||
int expected, long maxWaitSec)
|
||||
throws IOException {
|
||||
long start = Time.now();
|
||||
long start = Time.monotonicNow();
|
||||
|
||||
//wait for all the blocks to be replicated;
|
||||
LOG.info("Checking for block replication for " + filename);
|
||||
|
@ -298,7 +298,7 @@ public class TestReplication {
|
|||
}
|
||||
|
||||
if (maxWaitSec > 0 &&
|
||||
(Time.now() - start) > (maxWaitSec * 1000)) {
|
||||
(Time.monotonicNow() - start) > (maxWaitSec * 1000)) {
|
||||
throw new IOException("Timedout while waiting for all blocks to " +
|
||||
" be replicated for " + filename);
|
||||
}
|
||||
|
|
|
@ -262,7 +262,7 @@ public class TestBalancer {
|
|||
throws IOException, TimeoutException {
|
||||
long timeout = TIMEOUT;
|
||||
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
|
||||
: Time.now() + timeout;
|
||||
: Time.monotonicNow() + timeout;
|
||||
|
||||
while (true) {
|
||||
long[] status = client.getStats();
|
||||
|
@ -274,7 +274,7 @@ public class TestBalancer {
|
|||
&& usedSpaceVariance < CAPACITY_ALLOWED_VARIANCE)
|
||||
break; //done
|
||||
|
||||
if (Time.now() > failtime) {
|
||||
if (Time.monotonicNow() > failtime) {
|
||||
throw new TimeoutException("Cluster failed to reached expected values of "
|
||||
+ "totalSpace (current: " + status[0]
|
||||
+ ", expected: " + expectedTotalSpace
|
||||
|
@ -369,7 +369,7 @@ public class TestBalancer {
|
|||
int expectedExcludedNodes) throws IOException, TimeoutException {
|
||||
long timeout = TIMEOUT;
|
||||
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
|
||||
: Time.now() + timeout;
|
||||
: Time.monotonicNow() + timeout;
|
||||
if (!p.nodesToBeIncluded.isEmpty()) {
|
||||
totalCapacity = p.nodesToBeIncluded.size() * CAPACITY;
|
||||
}
|
||||
|
@ -399,7 +399,7 @@ public class TestBalancer {
|
|||
}
|
||||
if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) {
|
||||
balanced = false;
|
||||
if (Time.now() > failtime) {
|
||||
if (Time.monotonicNow() > failtime) {
|
||||
throw new TimeoutException(
|
||||
"Rebalancing expected avg utilization to become "
|
||||
+ avgUtilization + ", but on datanode " + datanode
|
||||
|
|
|
@ -186,7 +186,7 @@ public class BlockManagerTestUtil {
|
|||
Assert.assertNotNull("Could not find DN with name: " + dnName, theDND);
|
||||
|
||||
synchronized (hbm) {
|
||||
theDND.setLastUpdate(0);
|
||||
DFSTestUtil.setDatanodeDead(theDND);
|
||||
hbm.heartbeatCheck();
|
||||
}
|
||||
} finally {
|
||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil;
|
|||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.server.common.GenerationStamp;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -46,40 +47,34 @@ public class TestBlockInfoUnderConstruction {
|
|||
new DatanodeStorageInfo[] {s1, s2, s3});
|
||||
|
||||
// Recovery attempt #1.
|
||||
long currentTime = System.currentTimeMillis();
|
||||
dd1.setLastUpdate(currentTime - 3 * 1000);
|
||||
dd2.setLastUpdate(currentTime - 1 * 1000);
|
||||
dd3.setLastUpdate(currentTime - 2 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -3 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -2 * 1000);
|
||||
blockInfo.initializeBlockRecovery(1);
|
||||
BlockInfoContiguousUnderConstruction[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1);
|
||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||
|
||||
// Recovery attempt #2.
|
||||
currentTime = System.currentTimeMillis();
|
||||
dd1.setLastUpdate(currentTime - 2 * 1000);
|
||||
dd2.setLastUpdate(currentTime - 1 * 1000);
|
||||
dd3.setLastUpdate(currentTime - 3 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -3 * 1000);
|
||||
blockInfo.initializeBlockRecovery(2);
|
||||
blockInfoRecovery = dd1.getLeaseRecoveryCommand(1);
|
||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||
|
||||
// Recovery attempt #3.
|
||||
currentTime = System.currentTimeMillis();
|
||||
dd1.setLastUpdate(currentTime - 2 * 1000);
|
||||
dd2.setLastUpdate(currentTime - 1 * 1000);
|
||||
dd3.setLastUpdate(currentTime - 3 * 1000);
|
||||
currentTime = System.currentTimeMillis();
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, -3 * 1000);
|
||||
blockInfo.initializeBlockRecovery(3);
|
||||
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
|
||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||
|
||||
// Recovery attempt #4.
|
||||
// Reset everything. And again pick DN with most recent heart beat.
|
||||
currentTime = System.currentTimeMillis();
|
||||
dd1.setLastUpdate(currentTime - 2 * 1000);
|
||||
dd2.setLastUpdate(currentTime - 1 * 1000);
|
||||
dd3.setLastUpdate(currentTime);
|
||||
currentTime = System.currentTimeMillis();
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, -2 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
|
||||
blockInfo.initializeBlockRecovery(3);
|
||||
blockInfoRecovery = dd3.getLeaseRecoveryCommand(1);
|
||||
assertEquals(blockInfoRecovery[0], blockInfo);
|
||||
|
|
|
@ -23,6 +23,7 @@ import java.util.ArrayList;
|
|||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
|
@ -38,6 +39,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
|||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
|
@ -164,9 +166,9 @@ public class TestHeartbeatHandling {
|
|||
NameNodeAdapter.sendHeartBeat(nodeReg3, dd3, namesystem);
|
||||
|
||||
// Test with all alive nodes.
|
||||
dd1.setLastUpdate(System.currentTimeMillis());
|
||||
dd2.setLastUpdate(System.currentTimeMillis());
|
||||
dd3.setLastUpdate(System.currentTimeMillis());
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, 0);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, 0);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
|
||||
final DatanodeStorageInfo[] storages = {
|
||||
dd1.getStorageInfos()[0],
|
||||
dd2.getStorageInfos()[0],
|
||||
|
@ -189,10 +191,10 @@ public class TestHeartbeatHandling {
|
|||
assertEquals(recoveringNodes[2], dd3);
|
||||
|
||||
// Test with one stale node.
|
||||
dd1.setLastUpdate(System.currentTimeMillis());
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, 0);
|
||||
// More than the default stale interval of 30 seconds.
|
||||
dd2.setLastUpdate(System.currentTimeMillis() - 40 * 1000);
|
||||
dd3.setLastUpdate(System.currentTimeMillis());
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, -40 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, 0);
|
||||
blockInfo = new BlockInfoContiguousUnderConstruction(
|
||||
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3,
|
||||
BlockUCState.UNDER_RECOVERY, storages);
|
||||
|
@ -210,10 +212,10 @@ public class TestHeartbeatHandling {
|
|||
assertEquals(recoveringNodes[1], dd3);
|
||||
|
||||
// Test with all stale node.
|
||||
dd1.setLastUpdate(System.currentTimeMillis() - 60 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd1, - 60 * 1000);
|
||||
// More than the default stale interval of 30 seconds.
|
||||
dd2.setLastUpdate(System.currentTimeMillis() - 40 * 1000);
|
||||
dd3.setLastUpdate(System.currentTimeMillis() - 80 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd2, - 40 * 1000);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dd3, - 80 * 1000);
|
||||
blockInfo = new BlockInfoContiguousUnderConstruction(
|
||||
new Block(0, 0, GenerationStamp.LAST_RESERVED_STAMP), (short) 3,
|
||||
BlockUCState.UNDER_RECOVERY, storages);
|
||||
|
|
|
@ -18,6 +18,7 @@
|
|||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeID;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
|
@ -142,7 +143,7 @@ public class TestHostFileManager {
|
|||
.DatanodeReportType.DEAD).size());
|
||||
DatanodeDescriptor spam = new DatanodeDescriptor(new DatanodeID("127.0.0" +
|
||||
".3", "127.0.0.3", "uuid-spam", 12345, 1020, 1021, 1022));
|
||||
spam.setLastUpdate(0);
|
||||
DFSTestUtil.setDatanodeDead(spam);
|
||||
includedNodes.add(entry("127.0.0.3:12345"));
|
||||
dnMap.put("uuid-spam", spam);
|
||||
Assert.assertEquals(1, dm.getDatanodeListForReport(HdfsConstants
|
||||
|
|
|
@ -137,7 +137,7 @@ public class TestNodeCount {
|
|||
|
||||
void initializeTimeout(long timeout) {
|
||||
this.timeout = timeout;
|
||||
this.failtime = Time.now()
|
||||
this.failtime = Time.monotonicNow()
|
||||
+ ((timeout <= 0) ? Long.MAX_VALUE : timeout);
|
||||
}
|
||||
|
||||
|
@ -148,7 +148,7 @@ public class TestNodeCount {
|
|||
|
||||
/* check for timeout, then wait for cycleTime msec */
|
||||
void checkTimeout(String testLabel, long cycleTime) throws TimeoutException {
|
||||
if (Time.now() > failtime) {
|
||||
if (Time.monotonicNow() > failtime) {
|
||||
throw new TimeoutException("Timeout: "
|
||||
+ testLabel + " for block " + lastBlock + " after " + timeout
|
||||
+ " msec. Last counts: live = " + lastNum.liveReplicas()
|
||||
|
|
|
@ -17,7 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hdfs.server.blockmanagement;
|
||||
|
||||
import static org.apache.hadoop.util.Time.now;
|
||||
import static org.apache.hadoop.util.Time.monotonicNow;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
|
||||
|
@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
|||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NameNodeAdapter;
|
||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestOverReplicatedBlocks {
|
||||
|
@ -171,10 +172,10 @@ public class TestOverReplicatedBlocks {
|
|||
long waitTime = DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT * 1000 *
|
||||
(DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT + 1);
|
||||
do {
|
||||
nodeInfo =
|
||||
namesystem.getBlockManager().getDatanodeManager().getDatanode(dnReg);
|
||||
lastHeartbeat = nodeInfo.getLastUpdate();
|
||||
} while(now() - lastHeartbeat < waitTime);
|
||||
nodeInfo = namesystem.getBlockManager().getDatanodeManager()
|
||||
.getDatanode(dnReg);
|
||||
lastHeartbeat = nodeInfo.getLastUpdateMonotonic();
|
||||
} while (monotonicNow() - lastHeartbeat < waitTime);
|
||||
fs.setReplication(fileName, (short)3);
|
||||
|
||||
BlockLocation locs[] = fs.getFileBlockLocations(
|
||||
|
|
|
@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.LogVerificationAppender;
|
|||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager.StatefulBlockInfo;
|
||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCState;
|
||||
|
@ -564,7 +565,7 @@ public class TestReplicationPolicy {
|
|||
@Test
|
||||
public void testChooseTargetWithStaleNodes() throws Exception {
|
||||
// Set dataNodes[0] as stale
|
||||
dataNodes[0].setLastUpdate(Time.now() - staleInterval - 1);
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[0], -(staleInterval + 1));
|
||||
namenode.getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
|
||||
assertTrue(namenode.getNamesystem().getBlockManager()
|
||||
|
@ -584,7 +585,7 @@ public class TestReplicationPolicy {
|
|||
assertFalse(isOnSameRack(targets[0], dataNodes[0]));
|
||||
|
||||
// reset
|
||||
dataNodes[0].setLastUpdate(Time.now());
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[0], 0);
|
||||
namenode.getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
|
||||
}
|
||||
|
@ -601,7 +602,8 @@ public class TestReplicationPolicy {
|
|||
public void testChooseTargetWithHalfStaleNodes() throws Exception {
|
||||
// Set dataNodes[0], dataNodes[1], and dataNodes[2] as stale
|
||||
for (int i = 0; i < 3; i++) {
|
||||
dataNodes[i].setLastUpdate(Time.now() - staleInterval - 1);
|
||||
DFSTestUtil
|
||||
.resetLastUpdatesWithOffset(dataNodes[i], -(staleInterval + 1));
|
||||
}
|
||||
namenode.getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
|
||||
|
@ -633,7 +635,7 @@ public class TestReplicationPolicy {
|
|||
assertTrue(containsWithinRange(dataNodes[5], targets, 0, 3));
|
||||
|
||||
for (int i = 0; i < dataNodes.length; i++) {
|
||||
dataNodes[i].setLastUpdate(Time.now());
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[i], 0);
|
||||
}
|
||||
namenode.getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getHeartbeatManager().heartbeatCheck();
|
||||
|
@ -659,9 +661,10 @@ public class TestReplicationPolicy {
|
|||
for (int i = 0; i < 2; i++) {
|
||||
DataNode dn = miniCluster.getDataNodes().get(i);
|
||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
|
||||
miniCluster.getNameNode().getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getDatanode(dn.getDatanodeId())
|
||||
.setLastUpdate(Time.now() - staleInterval - 1);
|
||||
DatanodeDescriptor dnDes = miniCluster.getNameNode().getNamesystem()
|
||||
.getBlockManager().getDatanodeManager()
|
||||
.getDatanode(dn.getDatanodeId());
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dnDes, -(staleInterval + 1));
|
||||
}
|
||||
// Instead of waiting, explicitly call heartbeatCheck to
|
||||
// let heartbeat manager to detect stale nodes
|
||||
|
@ -689,9 +692,9 @@ public class TestReplicationPolicy {
|
|||
for (int i = 0; i < 4; i++) {
|
||||
DataNode dn = miniCluster.getDataNodes().get(i);
|
||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, true);
|
||||
miniCluster.getNameNode().getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getDatanode(dn.getDatanodeId())
|
||||
.setLastUpdate(Time.now() - staleInterval - 1);
|
||||
DatanodeDescriptor dnDesc = miniCluster.getNameNode().getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getDatanode(dn.getDatanodeId());
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dnDesc, -(staleInterval + 1));
|
||||
}
|
||||
// Explicitly call heartbeatCheck
|
||||
miniCluster.getNameNode().getNamesystem().getBlockManager()
|
||||
|
@ -710,14 +713,15 @@ public class TestReplicationPolicy {
|
|||
assertEquals(targets.length, 3);
|
||||
assertTrue(isOnSameRack(targets[0], staleNodeInfo));
|
||||
|
||||
// Step 3. Set 2 stale datanodes back to healthy nodes,
|
||||
// Step 3. Set 2 stale datanodes back to healthy nodes,
|
||||
// still have 2 stale nodes
|
||||
for (int i = 2; i < 4; i++) {
|
||||
DataNode dn = miniCluster.getDataNodes().get(i);
|
||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
|
||||
miniCluster.getNameNode().getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getDatanode(dn.getDatanodeId())
|
||||
.setLastUpdate(Time.now());
|
||||
DatanodeDescriptor dnDesc = miniCluster.getNameNode().getNamesystem()
|
||||
.getBlockManager().getDatanodeManager()
|
||||
.getDatanode(dn.getDatanodeId());
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dnDesc, 0);
|
||||
}
|
||||
// Explicitly call heartbeatCheck
|
||||
miniCluster.getNameNode().getNamesystem().getBlockManager()
|
||||
|
@ -973,7 +977,7 @@ public class TestReplicationPolicy {
|
|||
|
||||
// Refresh the last update time for all the datanodes
|
||||
for (int i = 0; i < dataNodes.length; i++) {
|
||||
dataNodes[i].setLastUpdate(Time.now());
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dataNodes[i], 0);
|
||||
}
|
||||
|
||||
List<DatanodeStorageInfo> first = new ArrayList<DatanodeStorageInfo>();
|
||||
|
|
|
@ -660,12 +660,12 @@ public abstract class BlockReportTestBase {
|
|||
final DataNode dn1 = cluster.getDataNodes().get(DN_N1);
|
||||
String bpid = cluster.getNamesystem().getBlockPoolId();
|
||||
Replica r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId());
|
||||
long start = Time.now();
|
||||
long start = Time.monotonicNow();
|
||||
int count = 0;
|
||||
while (r == null) {
|
||||
waitTil(5);
|
||||
r = DataNodeTestUtils.fetchReplicaInfo(dn1, bpid, bl.getBlockId());
|
||||
long waiting_period = Time.now() - start;
|
||||
long waiting_period = Time.monotonicNow() - start;
|
||||
if (count++ % 100 == 0)
|
||||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Has been waiting for " + waiting_period + " ms.");
|
||||
|
@ -679,7 +679,7 @@ public abstract class BlockReportTestBase {
|
|||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Replica state before the loop " + state.getValue());
|
||||
}
|
||||
start = Time.now();
|
||||
start = Time.monotonicNow();
|
||||
while (state != HdfsServerConstants.ReplicaState.TEMPORARY) {
|
||||
waitTil(5);
|
||||
state = r.getState();
|
||||
|
@ -687,7 +687,7 @@ public abstract class BlockReportTestBase {
|
|||
LOG.debug("Keep waiting for " + bl.getBlockName() +
|
||||
" is in state " + state.getValue());
|
||||
}
|
||||
if (Time.now() - start > TIMEOUT)
|
||||
if (Time.monotonicNow() - start > TIMEOUT)
|
||||
assertTrue("Was waiting too long for a replica to become TEMPORARY",
|
||||
tooLongWait);
|
||||
}
|
||||
|
|
|
@ -73,7 +73,7 @@ public class TestBlockReplacement {
|
|||
long bandwidthPerSec = 1024*1024L;
|
||||
final long TOTAL_BYTES =6*bandwidthPerSec;
|
||||
long bytesToSend = TOTAL_BYTES;
|
||||
long start = Time.now();
|
||||
long start = Time.monotonicNow();
|
||||
DataTransferThrottler throttler = new DataTransferThrottler(bandwidthPerSec);
|
||||
long totalBytes = 0L;
|
||||
long bytesSent = 1024*512L; // 0.5MB
|
||||
|
@ -86,7 +86,7 @@ public class TestBlockReplacement {
|
|||
Thread.sleep(1000);
|
||||
} catch (InterruptedException ignored) {}
|
||||
throttler.throttle(bytesToSend);
|
||||
long end = Time.now();
|
||||
long end = Time.monotonicNow();
|
||||
assertTrue(totalBytes*1000/(end-start)<=bandwidthPerSec);
|
||||
}
|
||||
|
||||
|
@ -254,7 +254,7 @@ public class TestBlockReplacement {
|
|||
throws IOException, TimeoutException {
|
||||
boolean notDone;
|
||||
final long TIMEOUT = 20000L;
|
||||
long starttime = Time.now();
|
||||
long starttime = Time.monotonicNow();
|
||||
long failtime = starttime + TIMEOUT;
|
||||
do {
|
||||
try {
|
||||
|
@ -279,7 +279,7 @@ public class TestBlockReplacement {
|
|||
}
|
||||
}
|
||||
}
|
||||
if (Time.now() > failtime) {
|
||||
if (Time.monotonicNow() > failtime) {
|
||||
String expectedNodesList = "";
|
||||
String currentNodesList = "";
|
||||
for (DatanodeInfo dn : includeNodes)
|
||||
|
|
|
@ -33,6 +33,7 @@ import org.apache.hadoop.fs.DF;
|
|||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||
import org.apache.hadoop.hdfs.DFSUtil;
|
||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||
|
@ -199,7 +200,7 @@ public class TestNamenodeCapacityReport {
|
|||
DataNode dn = datanodes.get(i);
|
||||
DatanodeDescriptor dnd = dnm.getDatanode(dn.getDatanodeId());
|
||||
dn.shutdown();
|
||||
dnd.setLastUpdate(0L);
|
||||
DFSTestUtil.setDatanodeDead(dnd);
|
||||
BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
|
||||
expectedInServiceNodes--;
|
||||
assertEquals(expectedInServiceNodes, namesystem.getNumLiveDataNodes());
|
||||
|
@ -278,7 +279,7 @@ public class TestNamenodeCapacityReport {
|
|||
dn.shutdown();
|
||||
// force it to appear dead so live count decreases
|
||||
DatanodeDescriptor dnDesc = dnm.getDatanode(dn.getDatanodeId());
|
||||
dnDesc.setLastUpdate(0L);
|
||||
DFSTestUtil.setDatanodeDead(dnDesc);
|
||||
BlockManagerTestUtil.checkHeartbeat(namesystem.getBlockManager());
|
||||
assertEquals(nodes-1-i, namesystem.getNumLiveDataNodes());
|
||||
// first few nodes are already out of service
|
||||
|
|
|
@ -43,6 +43,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction;
|
|||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerTestUtil;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNodeTestUtils;
|
||||
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
|
||||
|
@ -162,9 +163,10 @@ public class TestNameNodeMetrics {
|
|||
long staleInterval = CONF.getLong(
|
||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY,
|
||||
DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_DEFAULT);
|
||||
cluster.getNameNode().getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getDatanode(dn.getDatanodeId())
|
||||
.setLastUpdate(Time.now() - staleInterval - 1);
|
||||
DatanodeDescriptor dnDes = cluster.getNameNode().getNamesystem()
|
||||
.getBlockManager().getDatanodeManager()
|
||||
.getDatanode(dn.getDatanodeId());
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dnDes, -(staleInterval + 1));
|
||||
}
|
||||
// Let HeartbeatManager to check heartbeat
|
||||
BlockManagerTestUtil.checkHeartbeat(cluster.getNameNode().getNamesystem()
|
||||
|
@ -175,9 +177,10 @@ public class TestNameNodeMetrics {
|
|||
for (int i = 0; i < 2; i++) {
|
||||
DataNode dn = cluster.getDataNodes().get(i);
|
||||
DataNodeTestUtils.setHeartbeatsDisabledForTests(dn, false);
|
||||
cluster.getNameNode().getNamesystem().getBlockManager()
|
||||
.getDatanodeManager().getDatanode(dn.getDatanodeId())
|
||||
.setLastUpdate(Time.now());
|
||||
DatanodeDescriptor dnDes = cluster.getNameNode().getNamesystem()
|
||||
.getBlockManager().getDatanodeManager()
|
||||
.getDatanode(dn.getDatanodeId());
|
||||
DFSTestUtil.resetLastUpdatesWithOffset(dnDes, 0);
|
||||
}
|
||||
|
||||
// Let HeartbeatManager to refresh
|
||||
|
|
Loading…
Reference in New Issue