diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java index 7a052fd175f..9ba2978ac7b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoContiguousUnderConstruction.java @@ -31,7 +31,8 @@ import org.apache.hadoop.hdfs.server.namenode.NameNode; * Represents a block that is currently being constructed.
* This is usually the last block of a file opened for write or append. */ -public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { +public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous + implements BlockInfoUnderConstruction{ /** Block state. See {@link BlockUCState} */ private BlockUCState blockUCState; @@ -94,7 +95,7 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { return new BlockInfoContiguous(this); } - /** Set expected locations */ + @Override public void setExpectedLocations(DatanodeStorageInfo[] targets) { int numLocations = targets == null ? 0 : targets.length; this.replicas = new ArrayList<>(numLocations); @@ -104,10 +105,7 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { } } - /** - * Create array of expected replica locations - * (as has been assigned by chooseTargets()). - */ + @Override public DatanodeStorageInfo[] getExpectedStorageLocations() { int numLocations = replicas == null ? 0 : replicas.size(); DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; @@ -117,7 +115,7 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { return storages; } - /** Get the number of expected locations */ + @Override public int getNumExpectedLocations() { return replicas == null ? 0 : replicas.size(); } @@ -135,25 +133,26 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { blockUCState = s; } - /** Get block recovery ID */ + @Override public long getBlockRecoveryId() { return blockRecoveryId; } - /** Get recover block */ + @Override public Block getTruncateBlock() { return truncateBlock; } + @Override + public Block toBlock(){ + return this; + } + public void setTruncateBlock(Block recoveryBlock) { this.truncateBlock = recoveryBlock; } - /** - * Process the recorded replicas. When about to commit or finish the - * pipeline recovery sort out bad replicas. - * @param genStamp The final generation stamp for the block. - */ + @Override public void setGenerationStampAndVerifyReplicas(long genStamp) { // Set the generation stamp for the block. setGenerationStamp(genStamp); @@ -187,11 +186,7 @@ public class BlockInfoContiguousUnderConstruction extends BlockInfoContiguous { setGenerationStampAndVerifyReplicas(block.getGenerationStamp()); } - /** - * Initialize lease recovery for this block. - * Find the first alive data-node starting from the previous primary and - * make it primary. - */ + @Override public void initializeBlockRecovery(long recoveryId) { setBlockUCState(BlockUCState.UNDER_RECOVERY); blockRecoveryId = recoveryId; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java index b1857bbb88a..cfaf3a0f8ca 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoStripedUnderConstruction.java @@ -31,7 +31,8 @@ import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.BlockUCSt * Represents a striped block that is currently being constructed. * This is usually the last block of a file opened for write or append. */ -public class BlockInfoStripedUnderConstruction extends BlockInfoStriped { +public class BlockInfoStripedUnderConstruction extends BlockInfoStriped + implements BlockInfoUnderConstruction{ private BlockUCState blockUCState; /** @@ -39,6 +40,12 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped { */ private ReplicaUnderConstruction[] replicas; + /** + * Index of the primary data node doing the recovery. Useful for log + * messages. + */ + private int primaryNodeIndex = -1; + /** * The new generation stamp, which this block will have * after the recovery succeeds. Also used as a recovery id to identify @@ -82,6 +89,7 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped { } /** Set expected locations */ + @Override public void setExpectedLocations(DatanodeStorageInfo[] targets) { int numLocations = targets == null ? 0 : targets.length; this.replicas = new ReplicaUnderConstruction[numLocations]; @@ -98,6 +106,7 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped { * Create array of expected replica locations * (as has been assigned by chooseTargets()). */ + @Override public DatanodeStorageInfo[] getExpectedStorageLocations() { int numLocations = getNumExpectedLocations(); DatanodeStorageInfo[] storages = new DatanodeStorageInfo[numLocations]; @@ -117,7 +126,7 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped { return indices; } - /** Get the number of expected locations */ + @Override public int getNumExpectedLocations() { return replicas == null ? 0 : replicas.length; } @@ -135,16 +144,22 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped { blockUCState = s; } - /** Get block recovery ID */ + @Override public long getBlockRecoveryId() { return blockRecoveryId; } - /** - * Process the recorded replicas. When about to commit or finish the - * pipeline recovery sort out bad replicas. - * @param genStamp The final generation stamp for the block. - */ + @Override + public Block getTruncateBlock() { + return null; + } + + @Override + public Block toBlock(){ + return this; + } + + @Override public void setGenerationStampAndVerifyReplicas(long genStamp) { // Set the generation stamp for the block. setGenerationStamp(genStamp); @@ -178,18 +193,53 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped { setGenerationStampAndVerifyReplicas(block.getGenerationStamp()); } - /** - * Initialize lease recovery for this striped block. - */ + @Override public void initializeBlockRecovery(long recoveryId) { setBlockUCState(BlockUCState.UNDER_RECOVERY); blockRecoveryId = recoveryId; if (replicas == null || replicas.length == 0) { NameNode.blockStateChangeLog.warn("BLOCK*" + - " BlockInfoUnderConstruction.initLeaseRecovery:" + + " BlockInfoStripedUnderConstruction.initLeaseRecovery:" + " No blocks found, lease removed."); } - // TODO we need to implement different recovery logic here + boolean allLiveReplicasTriedAsPrimary = true; + for (ReplicaUnderConstruction replica : replicas) { + // Check if all replicas have been tried or not. + if (replica.isAlive()) { + allLiveReplicasTriedAsPrimary = (allLiveReplicasTriedAsPrimary && + replica.getChosenAsPrimary()); + } + } + if (allLiveReplicasTriedAsPrimary) { + // Just set all the replicas to be chosen whether they are alive or not. + for (ReplicaUnderConstruction replica : replicas) { + replica.setChosenAsPrimary(false); + } + } + long mostRecentLastUpdate = 0; + ReplicaUnderConstruction primary = null; + primaryNodeIndex = -1; + for(int i = 0; i < replicas.length; i++) { + // Skip alive replicas which have been chosen for recovery. + if (!(replicas[i].isAlive() && !replicas[i].getChosenAsPrimary())) { + continue; + } + final ReplicaUnderConstruction ruc = replicas[i]; + final long lastUpdate = ruc.getExpectedStorageLocation() + .getDatanodeDescriptor().getLastUpdateMonotonic(); + if (lastUpdate > mostRecentLastUpdate) { + primaryNodeIndex = i; + primary = ruc; + mostRecentLastUpdate = lastUpdate; + } + } + if (primary != null) { + primary.getExpectedStorageLocation().getDatanodeDescriptor() + .addBlockToBeRecovered(this); + primary.setChosenAsPrimary(true); + NameNode.blockStateChangeLog.info( + "BLOCK* {} recovery started, primary={}", this, primary); + } } void addReplicaIfNotPresent(DatanodeStorageInfo storage, Block reportedBlock, @@ -238,7 +288,9 @@ public class BlockInfoStripedUnderConstruction extends BlockInfoStriped { } private void appendUCParts(StringBuilder sb) { - sb.append("{UCState=").append(blockUCState).append(", replicas=["); + sb.append("{UCState=").append(blockUCState). + append(", primaryNodeIndex=").append(primaryNodeIndex). + append(", replicas=["); if (replicas != null) { int i = 0; for (ReplicaUnderConstruction r : replicas) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java new file mode 100644 index 00000000000..bfdd3864a5e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockInfoUnderConstruction.java @@ -0,0 +1,57 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import org.apache.hadoop.hdfs.protocol.Block; + +public interface BlockInfoUnderConstruction { + /** + * Create array of expected replica locations + * (as has been assigned by chooseTargets()). + */ + public DatanodeStorageInfo[] getExpectedStorageLocations(); + + /** Get recover block */ + public Block getTruncateBlock(); + + /** Convert to a Block object */ + public Block toBlock(); + + /** Get block recovery ID */ + public long getBlockRecoveryId(); + + /** Get the number of expected locations */ + public int getNumExpectedLocations(); + + /** Set expected locations */ + public void setExpectedLocations(DatanodeStorageInfo[] targets); + + /** + * Process the recorded replicas. When about to commit or finish the + * pipeline recovery sort out bad replicas. + * @param genStamp The final generation stamp for the block. + */ + public void setGenerationStampAndVerifyReplicas(long genStamp); + + /** + * Initialize lease recovery for this block. + * Find the first alive data-node starting from the previous primary and + * make it primary. + */ + public void initializeBlockRecovery(long recoveryId); +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java index 15427f75906..7ec71a27e09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeDescriptor.java @@ -253,8 +253,8 @@ public class DatanodeDescriptor extends DatanodeInfo { private final BlockQueue erasurecodeBlocks = new BlockQueue<>(); /** A queue of blocks to be recovered by this datanode */ - private final BlockQueue - recoverBlocks = new BlockQueue<>(); + private final BlockQueue recoverBlocks = + new BlockQueue<>(); /** A set of blocks to be invalidated by this datanode */ private final LightWeightHashSet invalidateBlocks = new LightWeightHashSet<>(); @@ -649,7 +649,7 @@ public class DatanodeDescriptor extends DatanodeInfo { /** * Store block recovery work. */ - void addBlockToBeRecovered(BlockInfoContiguousUnderConstruction block) { + void addBlockToBeRecovered(BlockInfoUnderConstruction block) { if(recoverBlocks.contains(block)) { // this prevents adding the same block twice to the recovery queue BlockManager.LOG.info(block + " is already in the recovery queue"); @@ -703,11 +703,11 @@ public class DatanodeDescriptor extends DatanodeInfo { return erasurecodeBlocks.poll(maxTransfers); } - public BlockInfoContiguousUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) { - List blocks = recoverBlocks.poll(maxTransfers); + public BlockInfoUnderConstruction[] getLeaseRecoveryCommand(int maxTransfers) { + List blocks = recoverBlocks.poll(maxTransfers); if(blocks == null) return null; - return blocks.toArray(new BlockInfoContiguousUnderConstruction[blocks.size()]); + return blocks.toArray(new BlockInfoUnderConstruction[blocks.size()]); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index c63e657df60..8a78a0be991 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -1379,12 +1379,12 @@ public class DatanodeManager { } //check lease recovery - BlockInfoContiguousUnderConstruction[] blocks = nodeinfo + BlockInfoUnderConstruction[] blocks = nodeinfo .getLeaseRecoveryCommand(Integer.MAX_VALUE); if (blocks != null) { BlockRecoveryCommand brCommand = new BlockRecoveryCommand( blocks.length); - for (BlockInfoContiguousUnderConstruction b : blocks) { + for (BlockInfoUnderConstruction b : blocks) { final DatanodeStorageInfo[] storages = b.getExpectedStorageLocations(); // Skip stale nodes during recovery - not heart beated for some time (30s by default). final List recoveryLocations = @@ -1398,10 +1398,10 @@ public class DatanodeManager { // to old block. boolean truncateRecovery = b.getTruncateBlock() != null; boolean copyOnTruncateRecovery = truncateRecovery && - b.getTruncateBlock().getBlockId() != b.getBlockId(); + b.getTruncateBlock().getBlockId() != b.toBlock().getBlockId(); ExtendedBlock primaryBlock = (copyOnTruncateRecovery) ? new ExtendedBlock(blockPoolId, b.getTruncateBlock()) : - new ExtendedBlock(blockPoolId, b); + new ExtendedBlock(blockPoolId, b.toBlock()); // If we only get 1 replica after eliminating stale nodes, then choose all // replicas for recovery and let the primary data node handle failures. DatanodeInfo[] recoveryInfos; @@ -1418,7 +1418,7 @@ public class DatanodeManager { recoveryInfos = DatanodeStorageInfo.toDatanodeInfos(storages); } if(truncateRecovery) { - Block recoveryBlock = (copyOnTruncateRecovery) ? b : + Block recoveryBlock = (copyOnTruncateRecovery) ? b.toBlock() : b.getTruncateBlock(); brCommand.add(new RecoveringBlock(primaryBlock, recoveryInfos, recoveryBlock)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index fa874d151d4..d19a1db8175 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -208,6 +208,7 @@ import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfo; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguous; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoContiguousUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoStripedUnderConstruction; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockInfoUnderConstruction; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeDescriptor; import org.apache.hadoop.hdfs.server.blockmanagement.DatanodeManager; @@ -3567,18 +3568,19 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, throw new AlreadyBeingCreatedException(message); case UNDER_CONSTRUCTION: case UNDER_RECOVERY: - // TODO support Striped block's recovery - final BlockInfoContiguousUnderConstruction uc = - (BlockInfoContiguousUnderConstruction)lastBlock; + // TODO support truncate of striped blocks + final BlockInfoUnderConstruction uc = + (BlockInfoUnderConstruction)lastBlock; // determine if last block was intended to be truncated Block recoveryBlock = uc.getTruncateBlock(); boolean truncateRecovery = recoveryBlock != null; boolean copyOnTruncate = truncateRecovery && - recoveryBlock.getBlockId() != uc.getBlockId(); + recoveryBlock.getBlockId() != uc.toBlock().getBlockId(); assert !copyOnTruncate || - recoveryBlock.getBlockId() < uc.getBlockId() && - recoveryBlock.getGenerationStamp() < uc.getGenerationStamp() && - recoveryBlock.getNumBytes() > uc.getNumBytes() : + recoveryBlock.getBlockId() < uc.toBlock().getBlockId() && + recoveryBlock.getGenerationStamp() < uc.toBlock(). + getGenerationStamp() && + recoveryBlock.getNumBytes() > uc.toBlock().getNumBytes() : "wrong recoveryBlock"; // setup the last block locations from the blockManager if not known @@ -3586,7 +3588,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, uc.setExpectedLocations(blockManager.getStorages(lastBlock)); } - if (uc.getNumExpectedLocations() == 0 && uc.getNumBytes() == 0) { + if (uc.getNumExpectedLocations() == 0 && + uc.toBlock().getNumBytes() == 0) { // There is no datanode reported to this block. // may be client have crashed before writing data to pipeline. // This blocks doesn't need any recovery. @@ -3599,10 +3602,11 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, return true; } // start recovery of the last block for this file - long blockRecoveryId = nextGenerationStamp(blockIdManager.isLegacyBlock(uc)); + long blockRecoveryId = + nextGenerationStamp(blockIdManager.isLegacyBlock(uc.toBlock())); lease = reassignLease(lease, src, recoveryLeaseHolder, pendingFile); if(copyOnTruncate) { - uc.setGenerationStamp(blockRecoveryId); + uc.toBlock().setGenerationStamp(blockRecoveryId); } else if(truncateRecovery) { recoveryBlock.setGenerationStamp(blockRecoveryId); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java index a7ba29399dc..f5a9cc41582 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockInfoUnderConstruction.java @@ -51,7 +51,7 @@ public class TestBlockInfoUnderConstruction { DFSTestUtil.resetLastUpdatesWithOffset(dd2, -1 * 1000); DFSTestUtil.resetLastUpdatesWithOffset(dd3, -2 * 1000); blockInfo.initializeBlockRecovery(1); - BlockInfoContiguousUnderConstruction[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1); + BlockInfoUnderConstruction[] blockInfoRecovery = dd2.getLeaseRecoveryCommand(1); assertEquals(blockInfoRecovery[0], blockInfo); // Recovery attempt #2.