HDFS-4122. Merging change r1403120 from trunk
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1403579 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
17fc6ed3b8
commit
7634ab0bef
@ -4,6 +4,9 @@ Release 2.0.3-alpha - Unreleased
|
|||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
||||||
|
HDFS-4122. Cleanup HDFS logs and reduce the size of logged messages.
|
||||||
|
(suresh)
|
||||||
|
|
||||||
NEW FEATURES
|
NEW FEATURES
|
||||||
|
|
||||||
HDFS-2656. Add libwebhdfs, a pure C client based on WebHDFS.
|
HDFS-2656. Add libwebhdfs, a pure C client based on WebHDFS.
|
||||||
|
@ -653,7 +653,7 @@ void abort() {
|
|||||||
// if there is no more clients under the renewer.
|
// if there is no more clients under the renewer.
|
||||||
getLeaseRenewer().closeClient(this);
|
getLeaseRenewer().closeClient(this);
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.info("Exception occurred while aborting the client. " + ioe);
|
LOG.info("Exception occurred while aborting the client " + ioe);
|
||||||
}
|
}
|
||||||
closeConnectionToNamenode();
|
closeConnectionToNamenode();
|
||||||
}
|
}
|
||||||
@ -2137,7 +2137,7 @@ void reportChecksumFailure(String file, LocatedBlock lblocks[]) {
|
|||||||
reportBadBlocks(lblocks);
|
reportBadBlocks(lblocks);
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
LOG.info("Found corruption while reading " + file
|
LOG.info("Found corruption while reading " + file
|
||||||
+ ". Error repairing corrupt blocks. Bad blocks remain.", ie);
|
+ ". Error repairing corrupt blocks. Bad blocks remain.", ie);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -457,7 +457,7 @@ private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
|
|||||||
buffersize, verifyChecksum, dfsClient.clientName);
|
buffersize, verifyChecksum, dfsClient.clientName);
|
||||||
if(connectFailedOnce) {
|
if(connectFailedOnce) {
|
||||||
DFSClient.LOG.info("Successfully connected to " + targetAddr +
|
DFSClient.LOG.info("Successfully connected to " + targetAddr +
|
||||||
" for block " + blk.getBlockId());
|
" for " + blk);
|
||||||
}
|
}
|
||||||
return chosenNode;
|
return chosenNode;
|
||||||
} catch (IOException ex) {
|
} catch (IOException ex) {
|
||||||
@ -736,9 +736,9 @@ private DNAddrPair chooseDataNode(LocatedBlock block)
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (nodes == null || nodes.length == 0) {
|
if (nodes == null || nodes.length == 0) {
|
||||||
DFSClient.LOG.info("No node available for block: " + blockInfo);
|
DFSClient.LOG.info("No node available for " + blockInfo);
|
||||||
}
|
}
|
||||||
DFSClient.LOG.info("Could not obtain block " + block.getBlock()
|
DFSClient.LOG.info("Could not obtain " + block.getBlock()
|
||||||
+ " from any node: " + ie
|
+ " from any node: " + ie
|
||||||
+ ". Will get new block locations from namenode and retry...");
|
+ ". Will get new block locations from namenode and retry...");
|
||||||
try {
|
try {
|
||||||
|
@ -740,7 +740,7 @@ void close() {
|
|||||||
//
|
//
|
||||||
private boolean processDatanodeError() throws IOException {
|
private boolean processDatanodeError() throws IOException {
|
||||||
if (response != null) {
|
if (response != null) {
|
||||||
DFSClient.LOG.info("Error Recovery for block " + block +
|
DFSClient.LOG.info("Error Recovery for " + block +
|
||||||
" waiting for responder to exit. ");
|
" waiting for responder to exit. ");
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
@ -1013,7 +1013,7 @@ private DatanodeInfo[] nextBlockOutputStream(String client) throws IOException {
|
|||||||
success = createBlockOutputStream(nodes, 0L, false);
|
success = createBlockOutputStream(nodes, 0L, false);
|
||||||
|
|
||||||
if (!success) {
|
if (!success) {
|
||||||
DFSClient.LOG.info("Abandoning block " + block);
|
DFSClient.LOG.info("Abandoning " + block);
|
||||||
dfsClient.namenode.abandonBlock(block, src, dfsClient.clientName);
|
dfsClient.namenode.abandonBlock(block, src, dfsClient.clientName);
|
||||||
block = null;
|
block = null;
|
||||||
DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
|
DFSClient.LOG.info("Excluding datanode " + nodes[errorIndex]);
|
||||||
@ -1781,7 +1781,7 @@ private void completeFile(ExtendedBlock last) throws IOException {
|
|||||||
try {
|
try {
|
||||||
Thread.sleep(400);
|
Thread.sleep(400);
|
||||||
if (Time.now() - localstart > 5000) {
|
if (Time.now() - localstart > 5000) {
|
||||||
DFSClient.LOG.info("Could not complete file " + src + " retrying...");
|
DFSClient.LOG.info("Could not complete " + src + " retrying...");
|
||||||
}
|
}
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
}
|
}
|
||||||
|
@ -746,7 +746,7 @@ public boolean reportChecksumFailure(Path f,
|
|||||||
}
|
}
|
||||||
DatanodeInfo[] dataNode = {dfsIn.getCurrentDatanode()};
|
DatanodeInfo[] dataNode = {dfsIn.getCurrentDatanode()};
|
||||||
lblocks[0] = new LocatedBlock(dataBlock, dataNode);
|
lblocks[0] = new LocatedBlock(dataBlock, dataNode);
|
||||||
LOG.info("Found checksum error in data stream at block="
|
LOG.info("Found checksum error in data stream at "
|
||||||
+ dataBlock + " on datanode="
|
+ dataBlock + " on datanode="
|
||||||
+ dataNode[0]);
|
+ dataNode[0]);
|
||||||
|
|
||||||
@ -759,7 +759,7 @@ public boolean reportChecksumFailure(Path f,
|
|||||||
}
|
}
|
||||||
DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()};
|
DatanodeInfo[] sumsNode = {dfsSums.getCurrentDatanode()};
|
||||||
lblocks[1] = new LocatedBlock(sumsBlock, sumsNode);
|
lblocks[1] = new LocatedBlock(sumsBlock, sumsNode);
|
||||||
LOG.info("Found checksum error in checksum stream at block="
|
LOG.info("Found checksum error in checksum stream at "
|
||||||
+ sumsBlock + " on datanode=" + sumsNode[0]);
|
+ sumsBlock + " on datanode=" + sumsNode[0]);
|
||||||
|
|
||||||
// Ask client to delete blocks.
|
// Ask client to delete blocks.
|
||||||
|
@ -951,8 +951,8 @@ private void addToInvalidates(Block b) {
|
|||||||
datanodes.append(node).append(" ");
|
datanodes.append(node).append(" ");
|
||||||
}
|
}
|
||||||
if (datanodes.length() != 0) {
|
if (datanodes.length() != 0) {
|
||||||
NameNode.stateChangeLog.info("BLOCK* addToInvalidates: "
|
NameNode.stateChangeLog.info("BLOCK* addToInvalidates: " + b + " "
|
||||||
+ b + " to " + datanodes.toString());
|
+ datanodes);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -973,7 +973,7 @@ public void findAndMarkBlockAsCorrupt(final ExtendedBlock blk,
|
|||||||
// thread of Datanode reports bad block before Block reports are sent
|
// thread of Datanode reports bad block before Block reports are sent
|
||||||
// by the Datanode on startup
|
// by the Datanode on startup
|
||||||
NameNode.stateChangeLog.info("BLOCK* findAndMarkBlockAsCorrupt: "
|
NameNode.stateChangeLog.info("BLOCK* findAndMarkBlockAsCorrupt: "
|
||||||
+ blk + " not found.");
|
+ blk + " not found");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn);
|
markBlockAsCorrupt(new BlockToMarkCorrupt(storedBlock, reason), dn);
|
||||||
@ -1027,7 +1027,7 @@ private void invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
|
|||||||
NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: postponing " +
|
NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: postponing " +
|
||||||
"invalidation of " + b + " on " + dn + " because " +
|
"invalidation of " + b + " on " + dn + " because " +
|
||||||
nr.replicasOnStaleNodes() + " replica(s) are located on nodes " +
|
nr.replicasOnStaleNodes() + " replica(s) are located on nodes " +
|
||||||
"with potentially out-of-date block reports.");
|
"with potentially out-of-date block reports");
|
||||||
postponeBlock(b.corrupted);
|
postponeBlock(b.corrupted);
|
||||||
|
|
||||||
} else if (nr.liveReplicas() >= 1) {
|
} else if (nr.liveReplicas() >= 1) {
|
||||||
@ -1040,7 +1040,7 @@ private void invalidateBlock(BlockToMarkCorrupt b, DatanodeInfo dn
|
|||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: " + b
|
NameNode.stateChangeLog.info("BLOCK* invalidateBlocks: " + b
|
||||||
+ " on " + dn + " is the only copy and was not deleted.");
|
+ " on " + dn + " is the only copy and was not deleted");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1161,9 +1161,8 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
|
|||||||
(blockHasEnoughRacks(block)) ) {
|
(blockHasEnoughRacks(block)) ) {
|
||||||
neededReplications.remove(block, priority); // remove from neededReplications
|
neededReplications.remove(block, priority); // remove from neededReplications
|
||||||
neededReplications.decrementReplicationIndex(priority);
|
neededReplications.decrementReplicationIndex(priority);
|
||||||
NameNode.stateChangeLog.info("BLOCK* "
|
NameNode.stateChangeLog.info("BLOCK* Removing " + block
|
||||||
+ "Removing block " + block
|
+ " from neededReplications as it has enough replicas");
|
||||||
+ " from neededReplications as it has enough replicas.");
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1237,9 +1236,8 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
|
|||||||
neededReplications.remove(block, priority); // remove from neededReplications
|
neededReplications.remove(block, priority); // remove from neededReplications
|
||||||
neededReplications.decrementReplicationIndex(priority);
|
neededReplications.decrementReplicationIndex(priority);
|
||||||
rw.targets = null;
|
rw.targets = null;
|
||||||
NameNode.stateChangeLog.info("BLOCK* "
|
NameNode.stateChangeLog.info("BLOCK* Removing " + block
|
||||||
+ "Removing block " + block
|
+ " from neededReplications as it has enough replicas");
|
||||||
+ " from neededReplications as it has enough replicas.");
|
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1291,10 +1289,8 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
|
|||||||
targetList.append(' ');
|
targetList.append(' ');
|
||||||
targetList.append(targets[k]);
|
targetList.append(targets[k]);
|
||||||
}
|
}
|
||||||
NameNode.stateChangeLog.info(
|
NameNode.stateChangeLog.info("BLOCK* ask " + rw.srcNode
|
||||||
"BLOCK* ask "
|
+ " to replicate " + rw.block + " to " + targetList);
|
||||||
+ rw.srcNode + " to replicate "
|
|
||||||
+ rw.block + " to " + targetList);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1528,10 +1524,9 @@ public void processReport(final DatanodeID nodeID, final String poolId,
|
|||||||
boolean staleBefore = node.areBlockContentsStale();
|
boolean staleBefore = node.areBlockContentsStale();
|
||||||
node.receivedBlockReport();
|
node.receivedBlockReport();
|
||||||
if (staleBefore && !node.areBlockContentsStale()) {
|
if (staleBefore && !node.areBlockContentsStale()) {
|
||||||
LOG.info("BLOCK* processReport: " +
|
LOG.info("BLOCK* processReport: Received first block report from "
|
||||||
"Received first block report from " + node +
|
+ node + " after becoming active. Its block contents are no longer"
|
||||||
" after becoming active. Its block contents are no longer" +
|
+ " considered stale");
|
||||||
" considered stale.");
|
|
||||||
rescanPostponedMisreplicatedBlocks();
|
rescanPostponedMisreplicatedBlocks();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1602,9 +1597,9 @@ private void processReport(final DatanodeDescriptor node,
|
|||||||
addStoredBlock(b, node, null, true);
|
addStoredBlock(b, node, null, true);
|
||||||
}
|
}
|
||||||
for (Block b : toInvalidate) {
|
for (Block b : toInvalidate) {
|
||||||
NameNode.stateChangeLog.info("BLOCK* processReport: block "
|
NameNode.stateChangeLog.info("BLOCK* processReport: "
|
||||||
+ b + " on " + node + " size " + b.getNumBytes()
|
+ b + " on " + node + " size " + b.getNumBytes()
|
||||||
+ " does not belong to any file.");
|
+ " does not belong to any file");
|
||||||
addToInvalidates(b, node);
|
addToInvalidates(b, node);
|
||||||
}
|
}
|
||||||
for (BlockToMarkCorrupt b : toCorrupt) {
|
for (BlockToMarkCorrupt b : toCorrupt) {
|
||||||
@ -1871,7 +1866,7 @@ public void processAllPendingDNMessages() throws IOException {
|
|||||||
int count = pendingDNMessages.count();
|
int count = pendingDNMessages.count();
|
||||||
if (count > 0) {
|
if (count > 0) {
|
||||||
LOG.info("Processing " + count + " messages from DataNodes " +
|
LOG.info("Processing " + count + " messages from DataNodes " +
|
||||||
"that were previously queued during standby state.");
|
"that were previously queued during standby state");
|
||||||
}
|
}
|
||||||
processQueuedMessages(pendingDNMessages.takeAll());
|
processQueuedMessages(pendingDNMessages.takeAll());
|
||||||
assert pendingDNMessages.count() == 0;
|
assert pendingDNMessages.count() == 0;
|
||||||
@ -1928,9 +1923,9 @@ private BlockToMarkCorrupt checkReplicaCorrupt(
|
|||||||
// the block report got a little bit delayed after the pipeline
|
// the block report got a little bit delayed after the pipeline
|
||||||
// closed. So, ignore this report, assuming we will get a
|
// closed. So, ignore this report, assuming we will get a
|
||||||
// FINALIZED replica later. See HDFS-2791
|
// FINALIZED replica later. See HDFS-2791
|
||||||
LOG.info("Received an RBW replica for block " + storedBlock +
|
LOG.info("Received an RBW replica for " + storedBlock +
|
||||||
" on " + dn + ": ignoring it, since the block is " +
|
" on " + dn + ": ignoring it, since it is " +
|
||||||
"complete with the same generation stamp.");
|
"complete with the same genstamp");
|
||||||
return null;
|
return null;
|
||||||
} else {
|
} else {
|
||||||
return new BlockToMarkCorrupt(storedBlock,
|
return new BlockToMarkCorrupt(storedBlock,
|
||||||
@ -2042,7 +2037,7 @@ private Block addStoredBlock(final BlockInfo block,
|
|||||||
// If this block does not belong to anyfile, then we are done.
|
// If this block does not belong to anyfile, then we are done.
|
||||||
NameNode.stateChangeLog.info("BLOCK* addStoredBlock: " + block + " on "
|
NameNode.stateChangeLog.info("BLOCK* addStoredBlock: " + block + " on "
|
||||||
+ node + " size " + block.getNumBytes()
|
+ node + " size " + block.getNumBytes()
|
||||||
+ " but it does not belong to any file.");
|
+ " but it does not belong to any file");
|
||||||
// we could add this block to invalidate set of this datanode.
|
// we could add this block to invalidate set of this datanode.
|
||||||
// it will happen in next block report otherwise.
|
// it will happen in next block report otherwise.
|
||||||
return block;
|
return block;
|
||||||
@ -2159,9 +2154,8 @@ private void invalidateCorruptReplicas(BlockInfo blk) {
|
|||||||
try {
|
try {
|
||||||
invalidateBlock(new BlockToMarkCorrupt(blk, null), node);
|
invalidateBlock(new BlockToMarkCorrupt(blk, null), node);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
NameNode.stateChangeLog.info("NameNode.invalidateCorruptReplicas " +
|
NameNode.stateChangeLog.info("invalidateCorruptReplicas "
|
||||||
"error in deleting bad block " + blk +
|
+ "error in deleting bad block " + blk + " on " + node, e);
|
||||||
" on " + node, e);
|
|
||||||
gotException = true;
|
gotException = true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2309,7 +2303,7 @@ private void processOverReplicatedBlock(final Block block,
|
|||||||
DatanodeDescriptor cur = it.next();
|
DatanodeDescriptor cur = it.next();
|
||||||
if (cur.areBlockContentsStale()) {
|
if (cur.areBlockContentsStale()) {
|
||||||
LOG.info("BLOCK* processOverReplicatedBlock: " +
|
LOG.info("BLOCK* processOverReplicatedBlock: " +
|
||||||
"Postponing processing of over-replicated block " +
|
"Postponing processing of over-replicated " +
|
||||||
block + " since datanode " + cur + " does not yet have up-to-date " +
|
block + " since datanode " + cur + " does not yet have up-to-date " +
|
||||||
"block information.");
|
"block information.");
|
||||||
postponeBlock(block);
|
postponeBlock(block);
|
||||||
@ -2427,7 +2421,7 @@ private void chooseExcessReplicates(Collection<DatanodeDescriptor> nonExcess,
|
|||||||
//
|
//
|
||||||
addToInvalidates(b, cur);
|
addToInvalidates(b, cur);
|
||||||
NameNode.stateChangeLog.info("BLOCK* chooseExcessReplicates: "
|
NameNode.stateChangeLog.info("BLOCK* chooseExcessReplicates: "
|
||||||
+"("+cur+", "+b+") is added to invalidated blocks set.");
|
+"("+cur+", "+b+") is added to invalidated blocks set");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2569,7 +2563,7 @@ private void processAndHandleReportedBlock(DatanodeDescriptor node, Block block,
|
|||||||
for (Block b : toInvalidate) {
|
for (Block b : toInvalidate) {
|
||||||
NameNode.stateChangeLog.info("BLOCK* addBlock: block "
|
NameNode.stateChangeLog.info("BLOCK* addBlock: block "
|
||||||
+ b + " on " + node + " size " + b.getNumBytes()
|
+ b + " on " + node + " size " + b.getNumBytes()
|
||||||
+ " does not belong to any file.");
|
+ " does not belong to any file");
|
||||||
addToInvalidates(b, node);
|
addToInvalidates(b, node);
|
||||||
}
|
}
|
||||||
for (BlockToMarkCorrupt b : toCorrupt) {
|
for (BlockToMarkCorrupt b : toCorrupt) {
|
||||||
@ -2680,7 +2674,7 @@ public NumberReplicas countNodes(Block b) {
|
|||||||
* of live nodes. If in startup safemode (or its 30-sec extension period),
|
* of live nodes. If in startup safemode (or its 30-sec extension period),
|
||||||
* then it gains speed by ignoring issues of excess replicas or nodes
|
* then it gains speed by ignoring issues of excess replicas or nodes
|
||||||
* that are decommissioned or in process of becoming decommissioned.
|
* that are decommissioned or in process of becoming decommissioned.
|
||||||
* If not in startup, then it calls {@link countNodes()} instead.
|
* If not in startup, then it calls {@link #countNodes(Block)} instead.
|
||||||
*
|
*
|
||||||
* @param b - the block being tested
|
* @param b - the block being tested
|
||||||
* @return count of live nodes for this block
|
* @return count of live nodes for this block
|
||||||
|
@ -366,8 +366,7 @@ void addBlockToBeReplicated(Block block, DatanodeDescriptor[] targets) {
|
|||||||
void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
|
void addBlockToBeRecovered(BlockInfoUnderConstruction block) {
|
||||||
if(recoverBlocks.contains(block)) {
|
if(recoverBlocks.contains(block)) {
|
||||||
// this prevents adding the same block twice to the recovery queue
|
// this prevents adding the same block twice to the recovery queue
|
||||||
BlockManager.LOG.info("Block " + block +
|
BlockManager.LOG.info(block + " is already in the recovery queue");
|
||||||
" is already in the recovery queue.");
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
recoverBlocks.offer(block);
|
recoverBlocks.offer(block);
|
||||||
|
@ -567,7 +567,7 @@ boolean checkDecommissionState(DatanodeDescriptor node) {
|
|||||||
if (node.isDecommissionInProgress()) {
|
if (node.isDecommissionInProgress()) {
|
||||||
if (!blockManager.isReplicationInProgress(node)) {
|
if (!blockManager.isReplicationInProgress(node)) {
|
||||||
node.setDecommissioned();
|
node.setDecommissioned();
|
||||||
LOG.info("Decommission complete for node " + node);
|
LOG.info("Decommission complete for " + node);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return node.isDecommissioned();
|
return node.isDecommissioned();
|
||||||
@ -576,8 +576,8 @@ boolean checkDecommissionState(DatanodeDescriptor node) {
|
|||||||
/** Start decommissioning the specified datanode. */
|
/** Start decommissioning the specified datanode. */
|
||||||
private void startDecommission(DatanodeDescriptor node) {
|
private void startDecommission(DatanodeDescriptor node) {
|
||||||
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
|
if (!node.isDecommissionInProgress() && !node.isDecommissioned()) {
|
||||||
LOG.info("Start Decommissioning node " + node + " with " +
|
LOG.info("Start Decommissioning " + node + " with " +
|
||||||
node.numBlocks() + " blocks.");
|
node.numBlocks() + " blocks");
|
||||||
heartbeatManager.startDecommission(node);
|
heartbeatManager.startDecommission(node);
|
||||||
node.decommissioningStatus.setStartTime(now());
|
node.decommissioningStatus.setStartTime(now());
|
||||||
|
|
||||||
@ -589,7 +589,7 @@ private void startDecommission(DatanodeDescriptor node) {
|
|||||||
/** Stop decommissioning the specified datanodes. */
|
/** Stop decommissioning the specified datanodes. */
|
||||||
void stopDecommission(DatanodeDescriptor node) {
|
void stopDecommission(DatanodeDescriptor node) {
|
||||||
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
|
if (node.isDecommissionInProgress() || node.isDecommissioned()) {
|
||||||
LOG.info("Stop Decommissioning node " + node);
|
LOG.info("Stop Decommissioning " + node);
|
||||||
heartbeatManager.stopDecommission(node);
|
heartbeatManager.stopDecommission(node);
|
||||||
blockManager.processOverReplicatedBlocksOnReCommission(node);
|
blockManager.processOverReplicatedBlocksOnReCommission(node);
|
||||||
}
|
}
|
||||||
@ -641,17 +641,15 @@ public void registerDatanode(DatanodeRegistration nodeReg)
|
|||||||
throw new DisallowedDatanodeException(nodeReg);
|
throw new DisallowedDatanodeException(nodeReg);
|
||||||
}
|
}
|
||||||
|
|
||||||
NameNode.stateChangeLog.info("BLOCK* NameSystem.registerDatanode: "
|
NameNode.stateChangeLog.info("BLOCK* registerDatanode: from "
|
||||||
+ "node registration from " + nodeReg
|
+ nodeReg + " storage " + nodeReg.getStorageID());
|
||||||
+ " storage " + nodeReg.getStorageID());
|
|
||||||
|
|
||||||
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
|
DatanodeDescriptor nodeS = datanodeMap.get(nodeReg.getStorageID());
|
||||||
DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
|
DatanodeDescriptor nodeN = host2DatanodeMap.getDatanodeByXferAddr(
|
||||||
nodeReg.getIpAddr(), nodeReg.getXferPort());
|
nodeReg.getIpAddr(), nodeReg.getXferPort());
|
||||||
|
|
||||||
if (nodeN != null && nodeN != nodeS) {
|
if (nodeN != null && nodeN != nodeS) {
|
||||||
NameNode.LOG.info("BLOCK* NameSystem.registerDatanode: "
|
NameNode.LOG.info("BLOCK* registerDatanode: " + nodeN);
|
||||||
+ "node from name: " + nodeN);
|
|
||||||
// nodeN previously served a different data storage,
|
// nodeN previously served a different data storage,
|
||||||
// which is not served by anybody anymore.
|
// which is not served by anybody anymore.
|
||||||
removeDatanode(nodeN);
|
removeDatanode(nodeN);
|
||||||
@ -666,8 +664,8 @@ public void registerDatanode(DatanodeRegistration nodeReg)
|
|||||||
// storage. We do not need to remove old data blocks, the delta will
|
// storage. We do not need to remove old data blocks, the delta will
|
||||||
// be calculated on the next block report from the datanode
|
// be calculated on the next block report from the datanode
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug("BLOCK* NameSystem.registerDatanode: "
|
NameNode.stateChangeLog.debug("BLOCK* registerDatanode: "
|
||||||
+ "node restarted.");
|
+ "node restarted.");
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// nodeS is found
|
// nodeS is found
|
||||||
@ -679,11 +677,9 @@ nodes with its data cleared (or user can just remove the StorageID
|
|||||||
value in "VERSION" file under the data directory of the datanode,
|
value in "VERSION" file under the data directory of the datanode,
|
||||||
but this is might not work if VERSION file format has changed
|
but this is might not work if VERSION file format has changed
|
||||||
*/
|
*/
|
||||||
NameNode.stateChangeLog.info( "BLOCK* NameSystem.registerDatanode: "
|
NameNode.stateChangeLog.info("BLOCK* registerDatanode: " + nodeS
|
||||||
+ "node " + nodeS
|
+ " is replaced by " + nodeReg + " with the same storageID "
|
||||||
+ " is replaced by " + nodeReg +
|
+ nodeReg.getStorageID());
|
||||||
" with the same storageID " +
|
|
||||||
nodeReg.getStorageID());
|
|
||||||
}
|
}
|
||||||
// update cluster map
|
// update cluster map
|
||||||
getNetworkTopology().remove(nodeS);
|
getNetworkTopology().remove(nodeS);
|
||||||
|
@ -430,7 +430,7 @@ public StorageState analyzeStorage(StartupOption startOpt, Storage storage)
|
|||||||
if (!root.exists()) {
|
if (!root.exists()) {
|
||||||
// storage directory does not exist
|
// storage directory does not exist
|
||||||
if (startOpt != StartupOption.FORMAT) {
|
if (startOpt != StartupOption.FORMAT) {
|
||||||
LOG.info("Storage directory " + rootPath + " does not exist.");
|
LOG.info("Storage directory " + rootPath + " does not exist");
|
||||||
return StorageState.NON_EXISTENT;
|
return StorageState.NON_EXISTENT;
|
||||||
}
|
}
|
||||||
LOG.info(rootPath + " does not exist. Creating ...");
|
LOG.info(rootPath + " does not exist. Creating ...");
|
||||||
@ -439,7 +439,7 @@ public StorageState analyzeStorage(StartupOption startOpt, Storage storage)
|
|||||||
}
|
}
|
||||||
// or is inaccessible
|
// or is inaccessible
|
||||||
if (!root.isDirectory()) {
|
if (!root.isDirectory()) {
|
||||||
LOG.info(rootPath + "is not a directory.");
|
LOG.info(rootPath + "is not a directory");
|
||||||
return StorageState.NON_EXISTENT;
|
return StorageState.NON_EXISTENT;
|
||||||
}
|
}
|
||||||
if (!root.canWrite()) {
|
if (!root.canWrite()) {
|
||||||
@ -536,34 +536,34 @@ public void doRecover(StorageState curState) throws IOException {
|
|||||||
switch(curState) {
|
switch(curState) {
|
||||||
case COMPLETE_UPGRADE: // mv previous.tmp -> previous
|
case COMPLETE_UPGRADE: // mv previous.tmp -> previous
|
||||||
LOG.info("Completing previous upgrade for storage directory "
|
LOG.info("Completing previous upgrade for storage directory "
|
||||||
+ rootPath + ".");
|
+ rootPath);
|
||||||
rename(getPreviousTmp(), getPreviousDir());
|
rename(getPreviousTmp(), getPreviousDir());
|
||||||
return;
|
return;
|
||||||
case RECOVER_UPGRADE: // mv previous.tmp -> current
|
case RECOVER_UPGRADE: // mv previous.tmp -> current
|
||||||
LOG.info("Recovering storage directory " + rootPath
|
LOG.info("Recovering storage directory " + rootPath
|
||||||
+ " from previous upgrade.");
|
+ " from previous upgrade");
|
||||||
if (curDir.exists())
|
if (curDir.exists())
|
||||||
deleteDir(curDir);
|
deleteDir(curDir);
|
||||||
rename(getPreviousTmp(), curDir);
|
rename(getPreviousTmp(), curDir);
|
||||||
return;
|
return;
|
||||||
case COMPLETE_ROLLBACK: // rm removed.tmp
|
case COMPLETE_ROLLBACK: // rm removed.tmp
|
||||||
LOG.info("Completing previous rollback for storage directory "
|
LOG.info("Completing previous rollback for storage directory "
|
||||||
+ rootPath + ".");
|
+ rootPath);
|
||||||
deleteDir(getRemovedTmp());
|
deleteDir(getRemovedTmp());
|
||||||
return;
|
return;
|
||||||
case RECOVER_ROLLBACK: // mv removed.tmp -> current
|
case RECOVER_ROLLBACK: // mv removed.tmp -> current
|
||||||
LOG.info("Recovering storage directory " + rootPath
|
LOG.info("Recovering storage directory " + rootPath
|
||||||
+ " from previous rollback.");
|
+ " from previous rollback");
|
||||||
rename(getRemovedTmp(), curDir);
|
rename(getRemovedTmp(), curDir);
|
||||||
return;
|
return;
|
||||||
case COMPLETE_FINALIZE: // rm finalized.tmp
|
case COMPLETE_FINALIZE: // rm finalized.tmp
|
||||||
LOG.info("Completing previous finalize for storage directory "
|
LOG.info("Completing previous finalize for storage directory "
|
||||||
+ rootPath + ".");
|
+ rootPath);
|
||||||
deleteDir(getFinalizedTmp());
|
deleteDir(getFinalizedTmp());
|
||||||
return;
|
return;
|
||||||
case COMPLETE_CHECKPOINT: // mv lastcheckpoint.tmp -> previous.checkpoint
|
case COMPLETE_CHECKPOINT: // mv lastcheckpoint.tmp -> previous.checkpoint
|
||||||
LOG.info("Completing previous checkpoint for storage directory "
|
LOG.info("Completing previous checkpoint for storage directory "
|
||||||
+ rootPath + ".");
|
+ rootPath);
|
||||||
File prevCkptDir = getPreviousCheckpoint();
|
File prevCkptDir = getPreviousCheckpoint();
|
||||||
if (prevCkptDir.exists())
|
if (prevCkptDir.exists())
|
||||||
deleteDir(prevCkptDir);
|
deleteDir(prevCkptDir);
|
||||||
@ -571,7 +571,7 @@ public void doRecover(StorageState curState) throws IOException {
|
|||||||
return;
|
return;
|
||||||
case RECOVER_CHECKPOINT: // mv lastcheckpoint.tmp -> current
|
case RECOVER_CHECKPOINT: // mv lastcheckpoint.tmp -> current
|
||||||
LOG.info("Recovering storage directory " + rootPath
|
LOG.info("Recovering storage directory " + rootPath
|
||||||
+ " from failed checkpoint.");
|
+ " from failed checkpoint");
|
||||||
if (curDir.exists())
|
if (curDir.exists())
|
||||||
deleteDir(curDir);
|
deleteDir(curDir);
|
||||||
rename(getLastCheckpointTmp(), curDir);
|
rename(getLastCheckpointTmp(), curDir);
|
||||||
@ -600,7 +600,7 @@ public void lock() throws IOException {
|
|||||||
FileLock newLock = tryLock();
|
FileLock newLock = tryLock();
|
||||||
if (newLock == null) {
|
if (newLock == null) {
|
||||||
String msg = "Cannot lock storage " + this.root
|
String msg = "Cannot lock storage " + this.root
|
||||||
+ ". The directory is already locked.";
|
+ ". The directory is already locked";
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
throw new IOException(msg);
|
throw new IOException(msg);
|
||||||
}
|
}
|
||||||
|
@ -637,8 +637,7 @@ private void sleepAndLogInterrupts(int millis,
|
|||||||
try {
|
try {
|
||||||
Thread.sleep(millis);
|
Thread.sleep(millis);
|
||||||
} catch (InterruptedException ie) {
|
} catch (InterruptedException ie) {
|
||||||
LOG.info("BPOfferService " + this +
|
LOG.info("BPOfferService " + this + " interrupted while " + stateString);
|
||||||
" interrupted while " + stateString);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,7 +154,7 @@ public int compareTo(BlockScanInfo other) {
|
|||||||
}
|
}
|
||||||
this.scanPeriod = hours * 3600 * 1000;
|
this.scanPeriod = hours * 3600 * 1000;
|
||||||
LOG.info("Periodic Block Verification Scanner initialized with interval "
|
LOG.info("Periodic Block Verification Scanner initialized with interval "
|
||||||
+ hours + " hours for block pool " + bpid + ".");
|
+ hours + " hours for block pool " + bpid);
|
||||||
|
|
||||||
// get the list of blocks and arrange them in random order
|
// get the list of blocks and arrange them in random order
|
||||||
List<Block> arr = dataset.getFinalizedBlocks(blockPoolId);
|
List<Block> arr = dataset.getFinalizedBlocks(blockPoolId);
|
||||||
@ -310,12 +310,12 @@ private synchronized void updateScanStatus(Block block,
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void handleScanFailure(ExtendedBlock block) {
|
private void handleScanFailure(ExtendedBlock block) {
|
||||||
LOG.info("Reporting bad block " + block);
|
LOG.info("Reporting bad " + block);
|
||||||
try {
|
try {
|
||||||
datanode.reportBadBlocks(block);
|
datanode.reportBadBlocks(block);
|
||||||
} catch (IOException ie) {
|
} catch (IOException ie) {
|
||||||
// it is bad, but not bad enough to shutdown the scanner
|
// it is bad, but not bad enough to shutdown the scanner
|
||||||
LOG.warn("Cannot report bad block=" + block.getBlockId());
|
LOG.warn("Cannot report bad " + block.getBlockId());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -411,7 +411,7 @@ void verifyBlock(ExtendedBlock block) {
|
|||||||
|
|
||||||
// If the block does not exists anymore, then its not an error
|
// If the block does not exists anymore, then its not an error
|
||||||
if (!dataset.contains(block)) {
|
if (!dataset.contains(block)) {
|
||||||
LOG.info(block + " is no longer in the dataset.");
|
LOG.info(block + " is no longer in the dataset");
|
||||||
deleteBlock(block.getLocalBlock());
|
deleteBlock(block.getLocalBlock());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -424,7 +424,7 @@ void verifyBlock(ExtendedBlock block) {
|
|||||||
// is a block really deleted by mistake, DirectoryScan should catch it.
|
// is a block really deleted by mistake, DirectoryScan should catch it.
|
||||||
if (e instanceof FileNotFoundException ) {
|
if (e instanceof FileNotFoundException ) {
|
||||||
LOG.info("Verification failed for " + block +
|
LOG.info("Verification failed for " + block +
|
||||||
". It may be due to race with write.");
|
" - may be due to race with write");
|
||||||
deleteBlock(block.getLocalBlock());
|
deleteBlock(block.getLocalBlock());
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -332,7 +332,7 @@ void doUpgrade(StorageDirectory bpSd, NamespaceInfo nsInfo) throws IOException {
|
|||||||
// 4.rename <SD>/curernt/<bpid>/previous.tmp to <SD>/curernt/<bpid>/previous
|
// 4.rename <SD>/curernt/<bpid>/previous.tmp to <SD>/curernt/<bpid>/previous
|
||||||
rename(bpTmpDir, bpPrevDir);
|
rename(bpTmpDir, bpPrevDir);
|
||||||
LOG.info("Upgrade of block pool " + blockpoolID + " at " + bpSd.getRoot()
|
LOG.info("Upgrade of block pool " + blockpoolID + " at " + bpSd.getRoot()
|
||||||
+ " is complete.");
|
+ " is complete");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -409,7 +409,7 @@ void doRollback(StorageDirectory bpSd, NamespaceInfo nsInfo)
|
|||||||
|
|
||||||
// 3. delete removed.tmp dir
|
// 3. delete removed.tmp dir
|
||||||
deleteDir(tmpDir);
|
deleteDir(tmpDir);
|
||||||
LOG.info("Rollback of " + bpSd.getRoot() + " is complete.");
|
LOG.info("Rollback of " + bpSd.getRoot() + " is complete");
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
|
@ -357,7 +357,7 @@ void flushOrSync(boolean isSync) throws IOException {
|
|||||||
private void handleMirrorOutError(IOException ioe) throws IOException {
|
private void handleMirrorOutError(IOException ioe) throws IOException {
|
||||||
String bpid = block.getBlockPoolId();
|
String bpid = block.getBlockPoolId();
|
||||||
LOG.info(datanode.getDNRegistrationForBP(bpid)
|
LOG.info(datanode.getDNRegistrationForBP(bpid)
|
||||||
+ ":Exception writing block " + block + " to mirror " + mirrorAddr, ioe);
|
+ ":Exception writing " + block + " to mirror " + mirrorAddr, ioe);
|
||||||
if (Thread.interrupted()) { // shut down if the thread is interrupted
|
if (Thread.interrupted()) { // shut down if the thread is interrupted
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} else { // encounter an error while writing to mirror
|
} else { // encounter an error while writing to mirror
|
||||||
@ -379,16 +379,16 @@ private void verifyChunks(ByteBuffer dataBuf, ByteBuffer checksumBuf)
|
|||||||
LOG.warn("Checksum error in block " + block + " from " + inAddr, ce);
|
LOG.warn("Checksum error in block " + block + " from " + inAddr, ce);
|
||||||
if (srcDataNode != null) {
|
if (srcDataNode != null) {
|
||||||
try {
|
try {
|
||||||
LOG.info("report corrupt block " + block + " from datanode " +
|
LOG.info("report corrupt " + block + " from datanode " +
|
||||||
srcDataNode + " to namenode");
|
srcDataNode + " to namenode");
|
||||||
datanode.reportRemoteBadBlock(srcDataNode, block);
|
datanode.reportRemoteBadBlock(srcDataNode, block);
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed to report bad block " + block +
|
LOG.warn("Failed to report bad " + block +
|
||||||
" from datanode " + srcDataNode + " to namenode");
|
" from datanode " + srcDataNode + " to namenode");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
throw new IOException("Unexpected checksum mismatch " +
|
throw new IOException("Unexpected checksum mismatch while writing "
|
||||||
"while writing " + block + " from " + inAddr);
|
+ block + " from " + inAddr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -518,7 +518,7 @@ private int receivePacket() throws IOException {
|
|||||||
// If this is a partial chunk, then read in pre-existing checksum
|
// If this is a partial chunk, then read in pre-existing checksum
|
||||||
if (firstByteInBlock % bytesPerChecksum != 0) {
|
if (firstByteInBlock % bytesPerChecksum != 0) {
|
||||||
LOG.info("Packet starts at " + firstByteInBlock +
|
LOG.info("Packet starts at " + firstByteInBlock +
|
||||||
" for block " + block +
|
" for " + block +
|
||||||
" which is not a multiple of bytesPerChecksum " +
|
" which is not a multiple of bytesPerChecksum " +
|
||||||
bytesPerChecksum);
|
bytesPerChecksum);
|
||||||
long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
|
long offsetInChecksum = BlockMetadataHeader.getHeaderSize() +
|
||||||
@ -662,7 +662,7 @@ void receiveBlock(
|
|||||||
}
|
}
|
||||||
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.info("Exception in receiveBlock for " + block, ioe);
|
LOG.info("Exception for " + block, ioe);
|
||||||
throw ioe;
|
throw ioe;
|
||||||
} finally {
|
} finally {
|
||||||
if (!responderClosed) { // Abnormal termination of the flow above
|
if (!responderClosed) { // Abnormal termination of the flow above
|
||||||
@ -733,10 +733,9 @@ private void computePartialChunkCrc(long blkoff, long ckoff,
|
|||||||
int checksumSize = diskChecksum.getChecksumSize();
|
int checksumSize = diskChecksum.getChecksumSize();
|
||||||
blkoff = blkoff - sizePartialChunk;
|
blkoff = blkoff - sizePartialChunk;
|
||||||
LOG.info("computePartialChunkCrc sizePartialChunk " +
|
LOG.info("computePartialChunkCrc sizePartialChunk " +
|
||||||
sizePartialChunk +
|
sizePartialChunk + " " + block +
|
||||||
" block " + block +
|
" block offset " + blkoff +
|
||||||
" offset in block " + blkoff +
|
" metafile offset " + ckoff);
|
||||||
" offset in metafile " + ckoff);
|
|
||||||
|
|
||||||
// create an input stream from the block file
|
// create an input stream from the block file
|
||||||
// and read in partial crc chunk into temporary buffer
|
// and read in partial crc chunk into temporary buffer
|
||||||
@ -758,7 +757,7 @@ private void computePartialChunkCrc(long blkoff, long ckoff,
|
|||||||
partialCrc = DataChecksum.newDataChecksum(
|
partialCrc = DataChecksum.newDataChecksum(
|
||||||
diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum());
|
diskChecksum.getChecksumType(), diskChecksum.getBytesPerChecksum());
|
||||||
partialCrc.update(buf, 0, sizePartialChunk);
|
partialCrc.update(buf, 0, sizePartialChunk);
|
||||||
LOG.info("Read in partial CRC chunk from disk for block " + block);
|
LOG.info("Read in partial CRC chunk from disk for " + block);
|
||||||
|
|
||||||
// paranoia! verify that the pre-computed crc matches what we
|
// paranoia! verify that the pre-computed crc matches what we
|
||||||
// recalculated just now
|
// recalculated just now
|
||||||
@ -973,7 +972,7 @@ public void run() {
|
|||||||
"HDFS_WRITE", clientname, offset,
|
"HDFS_WRITE", clientname, offset,
|
||||||
dnR.getStorageID(), block, endTime-startTime));
|
dnR.getStorageID(), block, endTime-startTime));
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Received block " + block + " of size "
|
LOG.info("Received " + block + " size "
|
||||||
+ block.getNumBytes() + " from " + inAddr);
|
+ block.getNumBytes() + " from " + inAddr);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -503,7 +503,7 @@ private int sendPacket(ByteBuffer pkt, int maxChunks, OutputStream out,
|
|||||||
* part of a block and then decides not to read the rest (but leaves
|
* part of a block and then decides not to read the rest (but leaves
|
||||||
* the socket open).
|
* the socket open).
|
||||||
*/
|
*/
|
||||||
LOG.info("BlockSender.sendChunks() exception: ", e);
|
LOG.info("exception: ", e);
|
||||||
} else {
|
} else {
|
||||||
/* Exception while writing to the client. Connection closure from
|
/* Exception while writing to the client. Connection closure from
|
||||||
* the other end is mostly the case and we do not care much about
|
* the other end is mostly the case and we do not care much about
|
||||||
|
@ -481,8 +481,7 @@ private synchronized void initDataBlockScanner(Configuration conf) {
|
|||||||
blockScanner = new DataBlockScanner(this, data, conf);
|
blockScanner = new DataBlockScanner(this, data, conf);
|
||||||
blockScanner.start();
|
blockScanner.start();
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Periodic Block Verification scan is disabled because " +
|
LOG.info("Periodic Block Verification scan disabled because " + reason);
|
||||||
reason + ".");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -511,7 +510,7 @@ private synchronized void initDirectoryScanner(Configuration conf) {
|
|||||||
directoryScanner.start();
|
directoryScanner.start();
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Periodic Directory Tree Verification scan is disabled because " +
|
LOG.info("Periodic Directory Tree Verification scan is disabled because " +
|
||||||
reason + ".");
|
reason);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1262,7 +1261,7 @@ private void transferBlock(ExtendedBlock block, DatanodeInfo xferTargets[])
|
|||||||
xfersBuilder.append(xferTargets[i]);
|
xfersBuilder.append(xferTargets[i]);
|
||||||
xfersBuilder.append(" ");
|
xfersBuilder.append(" ");
|
||||||
}
|
}
|
||||||
LOG.info(bpReg + " Starting thread to transfer block " +
|
LOG.info(bpReg + " Starting thread to transfer " +
|
||||||
block + " to " + xfersBuilder);
|
block + " to " + xfersBuilder);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2049,7 +2048,7 @@ private static void logRecoverBlock(String who, RecoveringBlock rb) {
|
|||||||
ExtendedBlock block = rb.getBlock();
|
ExtendedBlock block = rb.getBlock();
|
||||||
DatanodeInfo[] targets = rb.getLocations();
|
DatanodeInfo[] targets = rb.getLocations();
|
||||||
|
|
||||||
LOG.info(who + " calls recoverBlock(block=" + block
|
LOG.info(who + " calls recoverBlock(" + block
|
||||||
+ ", targets=[" + Joiner.on(", ").join(targets) + "]"
|
+ ", targets=[" + Joiner.on(", ").join(targets) + "]"
|
||||||
+ ", newGenerationStamp=" + rb.getNewGenerationStamp() + ")");
|
+ ", newGenerationStamp=" + rb.getNewGenerationStamp() + ")");
|
||||||
}
|
}
|
||||||
|
@ -155,11 +155,11 @@ synchronized void recoverTransitionRead(DataNode datanode,
|
|||||||
break;
|
break;
|
||||||
case NON_EXISTENT:
|
case NON_EXISTENT:
|
||||||
// ignore this storage
|
// ignore this storage
|
||||||
LOG.info("Storage directory " + dataDir + " does not exist.");
|
LOG.info("Storage directory " + dataDir + " does not exist");
|
||||||
it.remove();
|
it.remove();
|
||||||
continue;
|
continue;
|
||||||
case NOT_FORMATTED: // format
|
case NOT_FORMATTED: // format
|
||||||
LOG.info("Storage directory " + dataDir + " is not formatted.");
|
LOG.info("Storage directory " + dataDir + " is not formatted");
|
||||||
LOG.info("Formatting ...");
|
LOG.info("Formatting ...");
|
||||||
format(sd, nsInfo);
|
format(sd, nsInfo);
|
||||||
break;
|
break;
|
||||||
@ -482,7 +482,7 @@ void doUpgrade(StorageDirectory sd, NamespaceInfo nsInfo) throws IOException {
|
|||||||
|
|
||||||
// 5. Rename <SD>/previous.tmp to <SD>/previous
|
// 5. Rename <SD>/previous.tmp to <SD>/previous
|
||||||
rename(tmpDir, prevDir);
|
rename(tmpDir, prevDir);
|
||||||
LOG.info("Upgrade of " + sd.getRoot()+ " is complete.");
|
LOG.info("Upgrade of " + sd.getRoot()+ " is complete");
|
||||||
addBlockPoolStorage(nsInfo.getBlockPoolID(), bpStorage);
|
addBlockPoolStorage(nsInfo.getBlockPoolID(), bpStorage);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -556,7 +556,7 @@ void doRollback( StorageDirectory sd,
|
|||||||
rename(prevDir, curDir);
|
rename(prevDir, curDir);
|
||||||
// delete tmp dir
|
// delete tmp dir
|
||||||
deleteDir(tmpDir);
|
deleteDir(tmpDir);
|
||||||
LOG.info("Rollback of " + sd.getRoot() + " is complete.");
|
LOG.info("Rollback of " + sd.getRoot() + " is complete");
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -596,9 +596,9 @@ public void run() {
|
|||||||
deleteDir(bbwDir);
|
deleteDir(bbwDir);
|
||||||
}
|
}
|
||||||
} catch(IOException ex) {
|
} catch(IOException ex) {
|
||||||
LOG.error("Finalize upgrade for " + dataDirPath + " failed.", ex);
|
LOG.error("Finalize upgrade for " + dataDirPath + " failed", ex);
|
||||||
}
|
}
|
||||||
LOG.info("Finalize upgrade for " + dataDirPath + " is complete.");
|
LOG.info("Finalize upgrade for " + dataDirPath + " is complete");
|
||||||
}
|
}
|
||||||
@Override
|
@Override
|
||||||
public String toString() { return "Finalize " + dataDirPath; }
|
public String toString() { return "Finalize " + dataDirPath; }
|
||||||
|
@ -170,7 +170,7 @@ public void run() {
|
|||||||
} catch (InvalidMagicNumberException imne) {
|
} catch (InvalidMagicNumberException imne) {
|
||||||
LOG.info("Failed to read expected encryption handshake from client " +
|
LOG.info("Failed to read expected encryption handshake from client " +
|
||||||
"at " + s.getInetAddress() + ". Perhaps the client is running an " +
|
"at " + s.getInetAddress() + ". Perhaps the client is running an " +
|
||||||
"older version of Hadoop which does not support encryption.");
|
"older version of Hadoop which does not support encryption");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
input = encryptedStreams.in;
|
input = encryptedStreams.in;
|
||||||
@ -367,9 +367,8 @@ public void writeBlock(final ExtendedBlock block,
|
|||||||
// make a copy here.
|
// make a copy here.
|
||||||
final ExtendedBlock originalBlock = new ExtendedBlock(block);
|
final ExtendedBlock originalBlock = new ExtendedBlock(block);
|
||||||
block.setNumBytes(dataXceiverServer.estimateBlockSize);
|
block.setNumBytes(dataXceiverServer.estimateBlockSize);
|
||||||
LOG.info("Receiving block " + block +
|
LOG.info("Receiving " + block + " src: " + remoteAddress + " dest: "
|
||||||
" src: " + remoteAddress +
|
+ localAddress);
|
||||||
" dest: " + localAddress);
|
|
||||||
|
|
||||||
// reply to upstream datanode or client
|
// reply to upstream datanode or client
|
||||||
final DataOutputStream replyOut = new DataOutputStream(
|
final DataOutputStream replyOut = new DataOutputStream(
|
||||||
@ -478,9 +477,9 @@ public void writeBlock(final ExtendedBlock block,
|
|||||||
block + " to mirror " + mirrorNode + ": " + e);
|
block + " to mirror " + mirrorNode + ": " + e);
|
||||||
throw e;
|
throw e;
|
||||||
} else {
|
} else {
|
||||||
LOG.info(datanode + ":Exception transfering block " +
|
LOG.info(datanode + ":Exception transfering " +
|
||||||
block + " to mirror " + mirrorNode +
|
block + " to mirror " + mirrorNode +
|
||||||
". continuing without the mirror.", e);
|
"- continuing without the mirror", e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -528,10 +527,8 @@ public void writeBlock(final ExtendedBlock block,
|
|||||||
if (isDatanode ||
|
if (isDatanode ||
|
||||||
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
|
stage == BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
|
||||||
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
|
datanode.closeBlock(block, DataNode.EMPTY_DEL_HINT);
|
||||||
LOG.info("Received block " + block +
|
LOG.info("Received " + block + " src: " + remoteAddress + " dest: "
|
||||||
" src: " + remoteAddress +
|
+ localAddress + " of size " + block.getNumBytes());
|
||||||
" dest: " + localAddress +
|
|
||||||
" of size " + block.getNumBytes());
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -674,7 +671,7 @@ public void copyBlock(final ExtendedBlock block,
|
|||||||
datanode.metrics.incrBytesRead((int) read);
|
datanode.metrics.incrBytesRead((int) read);
|
||||||
datanode.metrics.incrBlocksRead();
|
datanode.metrics.incrBlocksRead();
|
||||||
|
|
||||||
LOG.info("Copied block " + block + " to " + s.getRemoteSocketAddress());
|
LOG.info("Copied " + block + " to " + s.getRemoteSocketAddress());
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
isOpSuccess = false;
|
isOpSuccess = false;
|
||||||
LOG.info("opCopyBlock " + block + " received exception " + ioe);
|
LOG.info("opCopyBlock " + block + " received exception " + ioe);
|
||||||
@ -797,8 +794,7 @@ public void replaceBlock(final ExtendedBlock block,
|
|||||||
// notify name node
|
// notify name node
|
||||||
datanode.notifyNamenodeReceivedBlock(block, delHint);
|
datanode.notifyNamenodeReceivedBlock(block, delHint);
|
||||||
|
|
||||||
LOG.info("Moved block " + block +
|
LOG.info("Moved " + block + " from " + s.getRemoteSocketAddress());
|
||||||
" from " + s.getRemoteSocketAddress());
|
|
||||||
|
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
opStatus = ERROR;
|
opStatus = ERROR;
|
||||||
|
@ -136,7 +136,7 @@ synchronized void shutdown() {
|
|||||||
if (executors == null) {
|
if (executors == null) {
|
||||||
LOG.warn("AsyncDiskService has already shut down.");
|
LOG.warn("AsyncDiskService has already shut down.");
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Shutting down all async disk service threads...");
|
LOG.info("Shutting down all async disk service threads");
|
||||||
|
|
||||||
for (Map.Entry<File, ThreadPoolExecutor> e : executors.entrySet()) {
|
for (Map.Entry<File, ThreadPoolExecutor> e : executors.entrySet()) {
|
||||||
e.getValue().shutdown();
|
e.getValue().shutdown();
|
||||||
@ -144,7 +144,7 @@ synchronized void shutdown() {
|
|||||||
// clear the executor map so that calling execute again will fail.
|
// clear the executor map so that calling execute again will fail.
|
||||||
executors = null;
|
executors = null;
|
||||||
|
|
||||||
LOG.info("All async disk service threads have been shut down.");
|
LOG.info("All async disk service threads have been shut down");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -154,7 +154,7 @@ synchronized void shutdown() {
|
|||||||
*/
|
*/
|
||||||
void deleteAsync(FsVolumeImpl volume, File blockFile, File metaFile,
|
void deleteAsync(FsVolumeImpl volume, File blockFile, File metaFile,
|
||||||
ExtendedBlock block) {
|
ExtendedBlock block) {
|
||||||
LOG.info("Scheduling block " + block.getLocalBlock()
|
LOG.info("Scheduling " + block.getLocalBlock()
|
||||||
+ " file " + blockFile + " for deletion");
|
+ " file " + blockFile + " for deletion");
|
||||||
ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
|
ReplicaFileDeleteTask deletionTask = new ReplicaFileDeleteTask(
|
||||||
volume, blockFile, metaFile, block);
|
volume, blockFile, metaFile, block);
|
||||||
@ -198,8 +198,8 @@ public void run() {
|
|||||||
datanode.notifyNamenodeDeletedBlock(block);
|
datanode.notifyNamenodeDeletedBlock(block);
|
||||||
}
|
}
|
||||||
volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
|
volume.decDfsUsed(block.getBlockPoolId(), dfsBytes);
|
||||||
LOG.info("Deleted block " + block.getBlockPoolId() + " "
|
LOG.info("Deleted " + block.getBlockPoolId() + " "
|
||||||
+ block.getLocalBlock() + " at file " + blockFile);
|
+ block.getLocalBlock() + " file " + blockFile);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -425,7 +425,7 @@ static private void truncateBlock(File blockFile, File metaFile,
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
if (newlen > oldlen) {
|
if (newlen > oldlen) {
|
||||||
throw new IOException("Cannout truncate block to from oldlen (=" + oldlen
|
throw new IOException("Cannot truncate block to from oldlen (=" + oldlen
|
||||||
+ ") to newlen (=" + newlen + ")");
|
+ ") to newlen (=" + newlen + ")");
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -481,7 +481,7 @@ public synchronized ReplicaInPipeline append(ExtendedBlock b,
|
|||||||
" should be greater than the replica " + b + "'s generation stamp");
|
" should be greater than the replica " + b + "'s generation stamp");
|
||||||
}
|
}
|
||||||
ReplicaInfo replicaInfo = getReplicaInfo(b);
|
ReplicaInfo replicaInfo = getReplicaInfo(b);
|
||||||
LOG.info("Appending to replica " + replicaInfo);
|
LOG.info("Appending to " + replicaInfo);
|
||||||
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
|
if (replicaInfo.getState() != ReplicaState.FINALIZED) {
|
||||||
throw new ReplicaNotFoundException(
|
throw new ReplicaNotFoundException(
|
||||||
ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
|
ReplicaNotFoundException.UNFINALIZED_REPLICA + b);
|
||||||
@ -689,7 +689,7 @@ public synchronized ReplicaInPipeline createRbw(ExtendedBlock b)
|
|||||||
public synchronized ReplicaInPipeline recoverRbw(ExtendedBlock b,
|
public synchronized ReplicaInPipeline recoverRbw(ExtendedBlock b,
|
||||||
long newGS, long minBytesRcvd, long maxBytesRcvd)
|
long newGS, long minBytesRcvd, long maxBytesRcvd)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
LOG.info("Recover the RBW replica " + b);
|
LOG.info("Recover RBW replica " + b);
|
||||||
|
|
||||||
ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
|
ReplicaInfo replicaInfo = getReplicaInfo(b.getBlockPoolId(), b.getBlockId());
|
||||||
|
|
||||||
@ -700,7 +700,7 @@ public synchronized ReplicaInPipeline recoverRbw(ExtendedBlock b,
|
|||||||
}
|
}
|
||||||
ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
|
ReplicaBeingWritten rbw = (ReplicaBeingWritten)replicaInfo;
|
||||||
|
|
||||||
LOG.info("Recovering replica " + rbw);
|
LOG.info("Recovering " + rbw);
|
||||||
|
|
||||||
// Stop the previous writer
|
// Stop the previous writer
|
||||||
rbw.stopWriter();
|
rbw.stopWriter();
|
||||||
@ -736,8 +736,8 @@ public synchronized ReplicaInPipeline convertTemporaryToRbw(
|
|||||||
final long blockId = b.getBlockId();
|
final long blockId = b.getBlockId();
|
||||||
final long expectedGs = b.getGenerationStamp();
|
final long expectedGs = b.getGenerationStamp();
|
||||||
final long visible = b.getNumBytes();
|
final long visible = b.getNumBytes();
|
||||||
LOG.info("Convert replica " + b
|
LOG.info("Convert " + b + " from Temporary to RBW, visible length="
|
||||||
+ " from Temporary to RBW, visible length=" + visible);
|
+ visible);
|
||||||
|
|
||||||
final ReplicaInPipeline temp;
|
final ReplicaInPipeline temp;
|
||||||
{
|
{
|
||||||
@ -1415,8 +1415,7 @@ public synchronized ReplicaRecoveryInfo initReplicaRecovery(
|
|||||||
static ReplicaRecoveryInfo initReplicaRecovery(String bpid,
|
static ReplicaRecoveryInfo initReplicaRecovery(String bpid,
|
||||||
ReplicaMap map, Block block, long recoveryId) throws IOException {
|
ReplicaMap map, Block block, long recoveryId) throws IOException {
|
||||||
final ReplicaInfo replica = map.get(bpid, block.getBlockId());
|
final ReplicaInfo replica = map.get(bpid, block.getBlockId());
|
||||||
LOG.info("initReplicaRecovery: block=" + block
|
LOG.info("initReplicaRecovery: " + block + ", recoveryId=" + recoveryId
|
||||||
+ ", recoveryId=" + recoveryId
|
|
||||||
+ ", replica=" + replica);
|
+ ", replica=" + replica);
|
||||||
|
|
||||||
//check replica
|
//check replica
|
||||||
@ -1485,7 +1484,7 @@ public synchronized String updateReplicaUnderRecovery(
|
|||||||
//get replica
|
//get replica
|
||||||
final String bpid = oldBlock.getBlockPoolId();
|
final String bpid = oldBlock.getBlockPoolId();
|
||||||
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
|
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
|
||||||
LOG.info("updateReplica: block=" + oldBlock
|
LOG.info("updateReplica: " + oldBlock
|
||||||
+ ", recoveryId=" + recoveryId
|
+ ", recoveryId=" + recoveryId
|
||||||
+ ", length=" + newlength
|
+ ", length=" + newlength
|
||||||
+ ", replica=" + replica);
|
+ ", replica=" + replica);
|
||||||
|
@ -140,7 +140,7 @@ boolean hasReadLock() {
|
|||||||
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY,
|
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_KEY,
|
||||||
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT);
|
DFSConfigKeys.DFS_NAMENODE_NAME_CACHE_THRESHOLD_DEFAULT);
|
||||||
NameNode.LOG.info("Caching file names occuring more than " + threshold
|
NameNode.LOG.info("Caching file names occuring more than " + threshold
|
||||||
+ " times ");
|
+ " times");
|
||||||
nameCache = new NameCache<ByteArray>(threshold);
|
nameCache = new NameCache<ByteArray>(threshold);
|
||||||
namesystem = ns;
|
namesystem = ns;
|
||||||
}
|
}
|
||||||
@ -253,15 +253,12 @@ INodeFileUnderConstruction addFile(String path,
|
|||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
if (newNode == null) {
|
if (newNode == null) {
|
||||||
NameNode.stateChangeLog.info("DIR* FSDirectory.addFile: "
|
NameNode.stateChangeLog.info("DIR* addFile: failed to add " + path);
|
||||||
+"failed to add "+path
|
|
||||||
+" to the file system");
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug("DIR* FSDirectory.addFile: "
|
NameNode.stateChangeLog.debug("DIR* addFile: " + path + " is added");
|
||||||
+path+" is added to the file system");
|
|
||||||
}
|
}
|
||||||
return newNode;
|
return newNode;
|
||||||
}
|
}
|
||||||
@ -2119,16 +2116,13 @@ INodeSymlink addSymlink(String path, String target,
|
|||||||
writeUnlock();
|
writeUnlock();
|
||||||
}
|
}
|
||||||
if (newNode == null) {
|
if (newNode == null) {
|
||||||
NameNode.stateChangeLog.info("DIR* FSDirectory.addSymlink: "
|
NameNode.stateChangeLog.info("DIR* addSymlink: failed to add " + path);
|
||||||
+"failed to add "+path
|
|
||||||
+" to the file system");
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode);
|
fsImage.getEditLog().logSymlink(path, target, modTime, modTime, newNode);
|
||||||
|
|
||||||
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
if(NameNode.stateChangeLog.isDebugEnabled()) {
|
||||||
NameNode.stateChangeLog.debug("DIR* FSDirectory.addSymlink: "
|
NameNode.stateChangeLog.debug("DIR* addSymlink: " + path + " is added");
|
||||||
+path+" is added to the file system");
|
|
||||||
}
|
}
|
||||||
return newNode;
|
return newNode;
|
||||||
}
|
}
|
||||||
|
@ -845,7 +845,7 @@ public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId)
|
|||||||
* in the new log.
|
* in the new log.
|
||||||
*/
|
*/
|
||||||
synchronized long rollEditLog() throws IOException {
|
synchronized long rollEditLog() throws IOException {
|
||||||
LOG.info("Rolling edit logs.");
|
LOG.info("Rolling edit logs");
|
||||||
endCurrentLogSegment(true);
|
endCurrentLogSegment(true);
|
||||||
|
|
||||||
long nextTxId = getLastWrittenTxId() + 1;
|
long nextTxId = getLastWrittenTxId() + 1;
|
||||||
|
@ -92,7 +92,7 @@ long loadFSEdits(EditLogInputStream edits, long expectedStartingTxId,
|
|||||||
expectedStartingTxId, recovery);
|
expectedStartingTxId, recovery);
|
||||||
FSImage.LOG.info("Edits file " + edits.getName()
|
FSImage.LOG.info("Edits file " + edits.getName()
|
||||||
+ " of size " + edits.length() + " edits # " + numEdits
|
+ " of size " + edits.length() + " edits # " + numEdits
|
||||||
+ " loaded in " + (now()-startTime)/1000 + " seconds.");
|
+ " loaded in " + (now()-startTime)/1000 + " seconds");
|
||||||
return numEdits;
|
return numEdits;
|
||||||
} finally {
|
} finally {
|
||||||
edits.close();
|
edits.close();
|
||||||
|
@ -641,11 +641,11 @@ void startActiveServices() throws IOException {
|
|||||||
editLog.recoverUnclosedStreams();
|
editLog.recoverUnclosedStreams();
|
||||||
|
|
||||||
LOG.info("Catching up to latest edits from old active before " +
|
LOG.info("Catching up to latest edits from old active before " +
|
||||||
"taking over writer role in edits logs.");
|
"taking over writer role in edits logs");
|
||||||
editLogTailer.catchupDuringFailover();
|
editLogTailer.catchupDuringFailover();
|
||||||
blockManager.setPostponeBlocksFromFuture(false);
|
blockManager.setPostponeBlocksFromFuture(false);
|
||||||
|
|
||||||
LOG.info("Reprocessing replication and invalidation queues...");
|
LOG.info("Reprocessing replication and invalidation queues");
|
||||||
blockManager.getDatanodeManager().markAllDatanodesStale();
|
blockManager.getDatanodeManager().markAllDatanodesStale();
|
||||||
blockManager.clearQueues();
|
blockManager.clearQueues();
|
||||||
blockManager.processAllPendingDNMessages();
|
blockManager.processAllPendingDNMessages();
|
||||||
@ -1954,7 +1954,7 @@ private void recoverLeaseInternal(INode fileInode,
|
|||||||
if (force) {
|
if (force) {
|
||||||
// close now: no need to wait for soft lease expiration and
|
// close now: no need to wait for soft lease expiration and
|
||||||
// close only the file src
|
// close only the file src
|
||||||
LOG.info("recoverLease: recover lease " + lease + ", src=" + src +
|
LOG.info("recoverLease: " + lease + ", src=" + src +
|
||||||
" from client " + pendingFile.getClientName());
|
" from client " + pendingFile.getClientName());
|
||||||
internalReleaseLease(lease, src, holder);
|
internalReleaseLease(lease, src, holder);
|
||||||
} else {
|
} else {
|
||||||
@ -1966,8 +1966,8 @@ private void recoverLeaseInternal(INode fileInode,
|
|||||||
// period, then start lease recovery.
|
// period, then start lease recovery.
|
||||||
//
|
//
|
||||||
if (lease.expiredSoftLimit()) {
|
if (lease.expiredSoftLimit()) {
|
||||||
LOG.info("startFile: recover lease " + lease + ", src=" + src +
|
LOG.info("startFile: recover " + lease + ", src=" + src + " client "
|
||||||
" from client " + pendingFile.getClientName());
|
+ pendingFile.getClientName());
|
||||||
boolean isClosed = internalReleaseLease(lease, src, null);
|
boolean isClosed = internalReleaseLease(lease, src, null);
|
||||||
if(!isClosed)
|
if(!isClosed)
|
||||||
throw new RecoveryInProgressException(
|
throw new RecoveryInProgressException(
|
||||||
@ -2143,7 +2143,7 @@ LocatedBlock getAdditionalBlock(String src,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// The retry case ("b" above) -- abandon the old block.
|
// The retry case ("b" above) -- abandon the old block.
|
||||||
NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: " +
|
NameNode.stateChangeLog.info("BLOCK* allocateBlock: " +
|
||||||
"caught retry for allocation of a new block in " +
|
"caught retry for allocation of a new block in " +
|
||||||
src + ". Abandoning old block " + lastBlockInFile);
|
src + ". Abandoning old block " + lastBlockInFile);
|
||||||
dir.removeBlock(src, pendingFile, lastBlockInFile);
|
dir.removeBlock(src, pendingFile, lastBlockInFile);
|
||||||
@ -2379,10 +2379,10 @@ private boolean completeFileInternal(String src,
|
|||||||
// See HDFS-3031.
|
// See HDFS-3031.
|
||||||
final Block realLastBlock = ((INodeFile)inode).getLastBlock();
|
final Block realLastBlock = ((INodeFile)inode).getLastBlock();
|
||||||
if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
|
if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
|
||||||
NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: " +
|
NameNode.stateChangeLog.info("DIR* completeFile: " +
|
||||||
"received request from " + holder + " to complete file " + src +
|
"request from " + holder + " to complete " + src +
|
||||||
" which is already closed. But, it appears to be an RPC " +
|
" which is already closed. But, it appears to be an RPC " +
|
||||||
"retry. Returning success.");
|
"retry. Returning success");
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -2397,8 +2397,8 @@ private boolean completeFileInternal(String src,
|
|||||||
|
|
||||||
finalizeINodeFileUnderConstruction(src, pendingFile);
|
finalizeINodeFileUnderConstruction(src, pendingFile);
|
||||||
|
|
||||||
NameNode.stateChangeLog.info("DIR* NameSystem.completeFile: file " + src
|
NameNode.stateChangeLog.info("DIR* completeFile: " + src + " is closed by "
|
||||||
+ " is closed by " + holder);
|
+ holder);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2423,8 +2423,8 @@ private Block allocateBlock(String src, INode[] inodes,
|
|||||||
nextGenerationStamp();
|
nextGenerationStamp();
|
||||||
b.setGenerationStamp(getGenerationStamp());
|
b.setGenerationStamp(getGenerationStamp());
|
||||||
b = dir.addBlock(src, inodes, b, targets);
|
b = dir.addBlock(src, inodes, b, targets);
|
||||||
NameNode.stateChangeLog.info("BLOCK* NameSystem.allocateBlock: "
|
NameNode.stateChangeLog.info("BLOCK* allocateBlock: " + src + ". "
|
||||||
+src+ ". " + blockPoolId + " "+ b);
|
+ blockPoolId + " " + b);
|
||||||
return b;
|
return b;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2442,8 +2442,8 @@ boolean checkFileProgress(INodeFile v, boolean checkall) {
|
|||||||
//
|
//
|
||||||
for (BlockInfo block: v.getBlocks()) {
|
for (BlockInfo block: v.getBlocks()) {
|
||||||
if (!block.isComplete()) {
|
if (!block.isComplete()) {
|
||||||
LOG.info("BLOCK* NameSystem.checkFileProgress: "
|
LOG.info("BLOCK* checkFileProgress: " + block
|
||||||
+ "block " + block + " has not reached minimal replication "
|
+ " has not reached minimal replication "
|
||||||
+ blockManager.minReplication);
|
+ blockManager.minReplication);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -2454,8 +2454,8 @@ boolean checkFileProgress(INodeFile v, boolean checkall) {
|
|||||||
//
|
//
|
||||||
BlockInfo b = v.getPenultimateBlock();
|
BlockInfo b = v.getPenultimateBlock();
|
||||||
if (b != null && !b.isComplete()) {
|
if (b != null && !b.isComplete()) {
|
||||||
LOG.info("BLOCK* NameSystem.checkFileProgress: "
|
LOG.info("BLOCK* checkFileProgress: " + b
|
||||||
+ "block " + b + " has not reached minimal replication "
|
+ " has not reached minimal replication "
|
||||||
+ blockManager.minReplication);
|
+ blockManager.minReplication);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
@ -2928,8 +2928,7 @@ void setQuota(String path, long nsQuota, long dsQuota)
|
|||||||
*/
|
*/
|
||||||
void fsync(String src, String clientName)
|
void fsync(String src, String clientName)
|
||||||
throws IOException, UnresolvedLinkException {
|
throws IOException, UnresolvedLinkException {
|
||||||
NameNode.stateChangeLog.info("BLOCK* NameSystem.fsync: file "
|
NameNode.stateChangeLog.info("BLOCK* fsync: " + src + " for " + clientName);
|
||||||
+ src + " for " + clientName);
|
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
@ -2960,7 +2959,7 @@ void fsync(String src, String clientName)
|
|||||||
boolean internalReleaseLease(Lease lease, String src,
|
boolean internalReleaseLease(Lease lease, String src,
|
||||||
String recoveryLeaseHolder) throws AlreadyBeingCreatedException,
|
String recoveryLeaseHolder) throws AlreadyBeingCreatedException,
|
||||||
IOException, UnresolvedLinkException {
|
IOException, UnresolvedLinkException {
|
||||||
LOG.info("Recovering lease=" + lease + ", src=" + src);
|
LOG.info("Recovering " + lease + ", src=" + src);
|
||||||
assert !isInSafeMode();
|
assert !isInSafeMode();
|
||||||
assert hasWriteLock();
|
assert hasWriteLock();
|
||||||
|
|
||||||
@ -3601,7 +3600,7 @@ void saveNamespace() throws AccessControlException, IOException {
|
|||||||
"in order to create namespace image.");
|
"in order to create namespace image.");
|
||||||
}
|
}
|
||||||
getFSImage().saveNamespace(this);
|
getFSImage().saveNamespace(this);
|
||||||
LOG.info("New namespace image has been created.");
|
LOG.info("New namespace image has been created");
|
||||||
} finally {
|
} finally {
|
||||||
readUnlock();
|
readUnlock();
|
||||||
}
|
}
|
||||||
@ -3819,11 +3818,11 @@ private synchronized void leave() {
|
|||||||
}
|
}
|
||||||
long timeInSafemode = now() - startTime;
|
long timeInSafemode = now() - startTime;
|
||||||
NameNode.stateChangeLog.info("STATE* Leaving safe mode after "
|
NameNode.stateChangeLog.info("STATE* Leaving safe mode after "
|
||||||
+ timeInSafemode/1000 + " secs.");
|
+ timeInSafemode/1000 + " secs");
|
||||||
NameNode.getNameNodeMetrics().setSafeModeTime((int) timeInSafemode);
|
NameNode.getNameNodeMetrics().setSafeModeTime((int) timeInSafemode);
|
||||||
|
|
||||||
if (reached >= 0) {
|
if (reached >= 0) {
|
||||||
NameNode.stateChangeLog.info("STATE* Safe mode is OFF.");
|
NameNode.stateChangeLog.info("STATE* Safe mode is OFF");
|
||||||
}
|
}
|
||||||
reached = -1;
|
reached = -1;
|
||||||
safeMode = null;
|
safeMode = null;
|
||||||
@ -4143,7 +4142,7 @@ public void run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if (!fsRunning) {
|
if (!fsRunning) {
|
||||||
LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread. ");
|
LOG.info("NameNode is being shutdown, exit SafeModeMonitor thread");
|
||||||
} else {
|
} else {
|
||||||
// leave safe mode and stop the monitor
|
// leave safe mode and stop the monitor
|
||||||
leaveSafeMode();
|
leaveSafeMode();
|
||||||
@ -4328,7 +4327,7 @@ void enterSafeMode(boolean resourcesLow) throws IOException {
|
|||||||
if (isEditlogOpenForWrite) {
|
if (isEditlogOpenForWrite) {
|
||||||
getEditLog().logSyncAll();
|
getEditLog().logSyncAll();
|
||||||
}
|
}
|
||||||
NameNode.stateChangeLog.info("STATE* Safe mode is ON. "
|
NameNode.stateChangeLog.info("STATE* Safe mode is ON"
|
||||||
+ safeMode.getTurnOffTip());
|
+ safeMode.getTurnOffTip());
|
||||||
} finally {
|
} finally {
|
||||||
writeUnlock();
|
writeUnlock();
|
||||||
@ -4343,7 +4342,7 @@ void leaveSafeMode() {
|
|||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
if (!isInSafeMode()) {
|
if (!isInSafeMode()) {
|
||||||
NameNode.stateChangeLog.info("STATE* Safe mode is already OFF.");
|
NameNode.stateChangeLog.info("STATE* Safe mode is already OFF");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
safeMode.leave();
|
safeMode.leave();
|
||||||
@ -4697,7 +4696,7 @@ void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
|
|||||||
try {
|
try {
|
||||||
checkOperation(OperationCategory.WRITE);
|
checkOperation(OperationCategory.WRITE);
|
||||||
|
|
||||||
NameNode.stateChangeLog.info("*DIR* NameNode.reportBadBlocks");
|
NameNode.stateChangeLog.info("*DIR* reportBadBlocks");
|
||||||
for (int i = 0; i < blocks.length; i++) {
|
for (int i = 0; i < blocks.length; i++) {
|
||||||
ExtendedBlock blk = blocks[i].getBlock();
|
ExtendedBlock blk = blocks[i].getBlock();
|
||||||
DatanodeInfo[] nodes = blocks[i].getLocations();
|
DatanodeInfo[] nodes = blocks[i].getLocations();
|
||||||
|
@ -77,7 +77,7 @@ public Void run() throws IOException {
|
|||||||
});
|
});
|
||||||
|
|
||||||
} catch(Exception e) {
|
} catch(Exception e) {
|
||||||
LOG.info("Exception while sending token. Re-throwing. ", e);
|
LOG.info("Exception while sending token. Re-throwing ", e);
|
||||||
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
|
resp.sendError(HttpServletResponse.SC_INTERNAL_SERVER_ERROR);
|
||||||
} finally {
|
} finally {
|
||||||
if(dos != null) dos.close();
|
if(dos != null) dos.close();
|
||||||
|
@ -429,7 +429,7 @@ private synchronized void checkLeases() {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("Lease " + oldest + " has expired hard limit");
|
LOG.info(oldest + " has expired hard limit");
|
||||||
|
|
||||||
final List<String> removing = new ArrayList<String>();
|
final List<String> removing = new ArrayList<String>();
|
||||||
// need to create a copy of the oldest lease paths, becuase
|
// need to create a copy of the oldest lease paths, becuase
|
||||||
@ -441,15 +441,14 @@ private synchronized void checkLeases() {
|
|||||||
for(String p : leasePaths) {
|
for(String p : leasePaths) {
|
||||||
try {
|
try {
|
||||||
if(fsnamesystem.internalReleaseLease(oldest, p, HdfsServerConstants.NAMENODE_LEASE_HOLDER)) {
|
if(fsnamesystem.internalReleaseLease(oldest, p, HdfsServerConstants.NAMENODE_LEASE_HOLDER)) {
|
||||||
LOG.info("Lease recovery for file " + p +
|
LOG.info("Lease recovery for " + p + " is complete. File closed.");
|
||||||
" is complete. File closed.");
|
|
||||||
removing.add(p);
|
removing.add(p);
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Started block recovery for file " + p +
|
LOG.info("Started block recovery " + p + " lease " + oldest);
|
||||||
" lease " + oldest);
|
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.error("Cannot release the path "+p+" in the lease "+oldest, e);
|
LOG.error("Cannot release the path " + p + " in the lease "
|
||||||
|
+ oldest, e);
|
||||||
removing.add(p);
|
removing.add(p);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -102,7 +102,7 @@ public static void editLogLoaderPrompt(String prompt,
|
|||||||
"without prompting. " +
|
"without prompting. " +
|
||||||
"(c/s/q/a)\n", "c", "s", "q", "a");
|
"(c/s/q/a)\n", "c", "s", "q", "a");
|
||||||
if (answer.equals("c")) {
|
if (answer.equals("c")) {
|
||||||
LOG.info("Continuing.");
|
LOG.info("Continuing");
|
||||||
return;
|
return;
|
||||||
} else if (answer.equals("s")) {
|
} else if (answer.equals("s")) {
|
||||||
throw new RequestStopException("user requested stop");
|
throw new RequestStopException("user requested stop");
|
||||||
|
Loading…
x
Reference in New Issue
Block a user