HDFS-3105. Add DatanodeStorage information to block recovery.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1302683 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-03-19 22:09:14 +00:00
parent 9d8d02b68b
commit 6326605acb
18 changed files with 127 additions and 96 deletions

View File

@ -241,6 +241,8 @@ Release 0.23.3 - UNRELEASED
HDFS-3088. Move FSDatasetInterface inner classes to a package. (szetszwo) HDFS-3088. Move FSDatasetInterface inner classes to a package. (szetszwo)
HDFS-3105. Add DatanodeStorage information to block recovery. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-3024. Improve performance of stringification in addStoredBlock (todd) HDFS-3024. Improve performance of stringification in addStoredBlock (todd)

View File

@ -293,7 +293,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements
@Override @Override
public void commitBlockSynchronization(ExtendedBlock block, public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength, boolean closeFile, long newgenerationstamp, long newlength, boolean closeFile,
boolean deleteblock, DatanodeID[] newtargets) throws IOException { boolean deleteblock, DatanodeID[] newtargets, String[] newtargetstorages
) throws IOException {
CommitBlockSynchronizationRequestProto.Builder builder = CommitBlockSynchronizationRequestProto.Builder builder =
CommitBlockSynchronizationRequestProto.newBuilder() CommitBlockSynchronizationRequestProto.newBuilder()
.setBlock(PBHelper.convert(block)).setNewGenStamp(newgenerationstamp) .setBlock(PBHelper.convert(block)).setNewGenStamp(newgenerationstamp)
@ -301,6 +302,7 @@ public class DatanodeProtocolClientSideTranslatorPB implements
.setDeleteBlock(deleteblock); .setDeleteBlock(deleteblock);
for (int i = 0; i < newtargets.length; i++) { for (int i = 0; i < newtargets.length; i++) {
builder.addNewTaragets(PBHelper.convert(newtargets[i])); builder.addNewTaragets(PBHelper.convert(newtargets[i]));
builder.addNewTargetStorages(newtargetstorages[i]);
} }
CommitBlockSynchronizationRequestProto req = builder.build(); CommitBlockSynchronizationRequestProto req = builder.build();
try { try {

View File

@ -259,10 +259,12 @@ public class DatanodeProtocolServerSideTranslatorPB implements
for (int i = 0; i < dnprotos.size(); i++) { for (int i = 0; i < dnprotos.size(); i++) {
dns[i] = PBHelper.convert(dnprotos.get(i)); dns[i] = PBHelper.convert(dnprotos.get(i));
} }
final List<String> sidprotos = request.getNewTargetStoragesList();
final String[] storageIDs = sidprotos.toArray(new String[sidprotos.size()]);
try { try {
impl.commitBlockSynchronization(PBHelper.convert(request.getBlock()), impl.commitBlockSynchronization(PBHelper.convert(request.getBlock()),
request.getNewGenStamp(), request.getNewLength(), request.getNewGenStamp(), request.getNewLength(),
request.getCloseFile(), request.getDeleteBlock(), dns); request.getCloseFile(), request.getDeleteBlock(), dns, storageIDs);
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }

View File

@ -20,7 +20,6 @@ package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryRequestProto; import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryResponseProto; import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateReplicaUnderRecoveryRequestProto; import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateReplicaUnderRecoveryRequestProto;
@ -66,14 +65,15 @@ public class InterDatanodeProtocolServerSideTranslatorPB implements
public UpdateReplicaUnderRecoveryResponseProto updateReplicaUnderRecovery( public UpdateReplicaUnderRecoveryResponseProto updateReplicaUnderRecovery(
RpcController unused, UpdateReplicaUnderRecoveryRequestProto request) RpcController unused, UpdateReplicaUnderRecoveryRequestProto request)
throws ServiceException { throws ServiceException {
ExtendedBlock b; final String storageID;
try { try {
b = impl.updateReplicaUnderRecovery(PBHelper.convert(request.getBlock()), storageID = impl.updateReplicaUnderRecovery(
PBHelper.convert(request.getBlock()),
request.getRecoveryId(), request.getNewLength()); request.getRecoveryId(), request.getNewLength());
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
return UpdateReplicaUnderRecoveryResponseProto.newBuilder() return UpdateReplicaUnderRecoveryResponseProto.newBuilder()
.setBlock(PBHelper.convert(b)).build(); .setStorageID(storageID).build();
} }
} }

View File

@ -91,15 +91,15 @@ public class InterDatanodeProtocolTranslatorPB implements
} }
@Override @Override
public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock, public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId, long newLength) throws IOException { long recoveryId, long newLength) throws IOException {
UpdateReplicaUnderRecoveryRequestProto req = UpdateReplicaUnderRecoveryRequestProto req =
UpdateReplicaUnderRecoveryRequestProto.newBuilder() UpdateReplicaUnderRecoveryRequestProto.newBuilder()
.setBlock(PBHelper.convert(oldBlock)) .setBlock(PBHelper.convert(oldBlock))
.setNewLength(newLength).setRecoveryId(recoveryId).build(); .setNewLength(newLength).setRecoveryId(recoveryId).build();
try { try {
return PBHelper.convert(rpcProxy.updateReplicaUnderRecovery( return rpcProxy.updateReplicaUnderRecovery(NULL_CONTROLLER, req
NULL_CONTROLLER, req).getBlock()); ).getStorageID();
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }

View File

@ -1765,10 +1765,9 @@ public class DataNode extends Configured
* Update replica with the new generation stamp and length. * Update replica with the new generation stamp and length.
*/ */
@Override // InterDatanodeProtocol @Override // InterDatanodeProtocol
public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock, public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
long recoveryId, final long recoveryId, final long newLength) throws IOException {
long newLength) throws IOException { final String storageID = data.updateReplicaUnderRecovery(oldBlock,
ReplicaInfo r = data.updateReplicaUnderRecovery(oldBlock,
recoveryId, newLength); recoveryId, newLength);
// Notify the namenode of the updated block info. This is important // Notify the namenode of the updated block info. This is important
// for HA, since otherwise the standby node may lose track of the // for HA, since otherwise the standby node may lose track of the
@ -1777,7 +1776,7 @@ public class DataNode extends Configured
newBlock.setGenerationStamp(recoveryId); newBlock.setGenerationStamp(recoveryId);
newBlock.setNumBytes(newLength); newBlock.setNumBytes(newLength);
notifyNamenodeReceivedBlock(newBlock, ""); notifyNamenodeReceivedBlock(newBlock, "");
return new ExtendedBlock(oldBlock.getBlockPoolId(), r); return storageID;
} }
/** A convenient class used in block recovery */ /** A convenient class used in block recovery */
@ -1786,6 +1785,8 @@ public class DataNode extends Configured
final InterDatanodeProtocol datanode; final InterDatanodeProtocol datanode;
final ReplicaRecoveryInfo rInfo; final ReplicaRecoveryInfo rInfo;
private String storageID;
BlockRecord(DatanodeID id, BlockRecord(DatanodeID id,
InterDatanodeProtocol datanode, InterDatanodeProtocol datanode,
ReplicaRecoveryInfo rInfo) { ReplicaRecoveryInfo rInfo) {
@ -1794,6 +1795,12 @@ public class DataNode extends Configured
this.rInfo = rInfo; this.rInfo = rInfo;
} }
void updateReplicaUnderRecovery(String bpid, long recoveryId, long newLength
) throws IOException {
final ExtendedBlock b = new ExtendedBlock(bpid, rInfo);
storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newLength);
}
@Override @Override
public String toString() { public String toString() {
return "block:" + rInfo + " node:" + id; return "block:" + rInfo + " node:" + id;
@ -1870,6 +1877,7 @@ public class DataNode extends Configured
void syncBlock(RecoveringBlock rBlock, void syncBlock(RecoveringBlock rBlock,
List<BlockRecord> syncList) throws IOException { List<BlockRecord> syncList) throws IOException {
ExtendedBlock block = rBlock.getBlock(); ExtendedBlock block = rBlock.getBlock();
final String bpid = block.getBlockPoolId();
DatanodeProtocolClientSideTranslatorPB nn = DatanodeProtocolClientSideTranslatorPB nn =
getActiveNamenodeForBP(block.getBlockPoolId()); getActiveNamenodeForBP(block.getBlockPoolId());
if (nn == null) { if (nn == null) {
@ -1889,7 +1897,7 @@ public class DataNode extends Configured
// The block can be deleted. // The block can be deleted.
if (syncList.isEmpty()) { if (syncList.isEmpty()) {
nn.commitBlockSynchronization(block, recoveryId, 0, nn.commitBlockSynchronization(block, recoveryId, 0,
true, true, DatanodeID.EMPTY_ARRAY); true, true, DatanodeID.EMPTY_ARRAY, null);
return; return;
} }
@ -1912,8 +1920,8 @@ public class DataNode extends Configured
// Calculate list of nodes that will participate in the recovery // Calculate list of nodes that will participate in the recovery
// and the new block size // and the new block size
List<BlockRecord> participatingList = new ArrayList<BlockRecord>(); List<BlockRecord> participatingList = new ArrayList<BlockRecord>();
final ExtendedBlock newBlock = new ExtendedBlock(block.getBlockPoolId(), block final ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(),
.getBlockId(), -1, recoveryId); -1, recoveryId);
switch(bestState) { switch(bestState) {
case FINALIZED: case FINALIZED:
assert finalizedLength > 0 : "finalizedLength is not positive"; assert finalizedLength > 0 : "finalizedLength is not positive";
@ -1944,16 +1952,11 @@ public class DataNode extends Configured
} }
List<DatanodeID> failedList = new ArrayList<DatanodeID>(); List<DatanodeID> failedList = new ArrayList<DatanodeID>();
List<DatanodeID> successList = new ArrayList<DatanodeID>(); final List<BlockRecord> successList = new ArrayList<BlockRecord>();
for(BlockRecord r : participatingList) { for(BlockRecord r : participatingList) {
try { try {
ExtendedBlock reply = r.datanode.updateReplicaUnderRecovery( r.updateReplicaUnderRecovery(bpid, recoveryId, newBlock.getNumBytes());
new ExtendedBlock(newBlock.getBlockPoolId(), r.rInfo), recoveryId, successList.add(r);
newBlock.getNumBytes());
assert reply.equals(newBlock) &&
reply.getNumBytes() == newBlock.getNumBytes() :
"Updated replica must be the same as the new block.";
successList.add(r.id);
} catch (IOException e) { } catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock=" InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+ newBlock + ", datanode=" + r.id + ")", e); + newBlock + ", datanode=" + r.id + ")", e);
@ -1974,10 +1977,16 @@ public class DataNode extends Configured
} }
// Notify the name-node about successfully recovered replicas. // Notify the name-node about successfully recovered replicas.
DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]); final DatanodeID[] datanodes = new DatanodeID[successList.size()];
final String[] storages = new String[datanodes.length];
for(int i = 0; i < datanodes.length; i++) {
final BlockRecord r = successList.get(i);
datanodes[i] = r.id;
storages[i] = r.storageID;
}
nn.commitBlockSynchronization(block, nn.commitBlockSynchronization(block,
newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false, newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
nlist); datanodes, storages);
} }
private static void logRecoverBlock(String who, private static void logRecoverBlock(String who,

View File

@ -553,14 +553,16 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
*/ */
static class FSVolume implements FsVolumeSpi { static class FSVolume implements FsVolumeSpi {
private final FSDataset dataset; private final FSDataset dataset;
private final String storageID;
private final Map<String, BlockPoolSlice> map = new HashMap<String, BlockPoolSlice>(); private final Map<String, BlockPoolSlice> map = new HashMap<String, BlockPoolSlice>();
private final File currentDir; // <StorageDirectory>/current private final File currentDir; // <StorageDirectory>/current
private final DF usage; private final DF usage;
private final long reserved; private final long reserved;
FSVolume(FSDataset dataset, File currentDir, Configuration conf FSVolume(FSDataset dataset, String storageID, File currentDir,
) throws IOException { Configuration conf) throws IOException {
this.dataset = dataset; this.dataset = dataset;
this.storageID = storageID;
this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY, this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT); DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
this.currentDir = currentDir; this.currentDir = currentDir;
@ -808,6 +810,10 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
} }
} }
} }
String getStorageID() {
return storageID;
}
} }
static class FSVolumeSet { static class FSVolumeSet {
@ -1017,6 +1023,12 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
return volumes.volumes; return volumes.volumes;
} }
@Override
public synchronized FSVolume getVolume(final ExtendedBlock b) {
final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
return r != null? (FSVolume)r.getVolume(): null;
}
@Override // FSDatasetInterface @Override // FSDatasetInterface
public synchronized Block getStoredBlock(String bpid, long blkid) public synchronized Block getStoredBlock(String bpid, long blkid)
throws IOException { throws IOException {
@ -1107,7 +1119,7 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
storage.getNumStorageDirs()); storage.getNumStorageDirs());
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) { for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
final File dir = storage.getStorageDir(idx).getCurrentDir(); final File dir = storage.getStorageDir(idx).getCurrentDir();
volArray.add(new FSVolume(this, dir, conf)); volArray.add(new FSVolume(this, storage.getStorageID(), dir, conf));
DataNode.LOG.info("FSDataset added volume - " + dir); DataNode.LOG.info("FSDataset added volume - " + dir);
} }
volumeMap = new ReplicasMap(this); volumeMap = new ReplicasMap(this);
@ -1758,19 +1770,6 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
channel.position(newPos); channel.position(newPos);
} }
synchronized File createTmpFile(FSVolume vol, String bpid, Block blk) throws IOException {
if ( vol == null ) {
ReplicaInfo replica = volumeMap.get(bpid, blk);
if (replica != null) {
vol = (FSVolume)volumeMap.get(bpid, blk).getVolume();
}
if ( vol == null ) {
throw new IOException("Could not find volume for block " + blk);
}
}
return vol.createTmpFile(bpid, blk);
}
// //
// REMIND - mjc - eventually we should have a timeout system // REMIND - mjc - eventually we should have a timeout system
// in place to clean up block files left by abandoned clients. // in place to clean up block files left by abandoned clients.
@ -2421,13 +2420,13 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
} }
@Override // FSDatasetInterface @Override // FSDatasetInterface
public synchronized ReplicaInfo updateReplicaUnderRecovery( public synchronized String updateReplicaUnderRecovery(
final ExtendedBlock oldBlock, final ExtendedBlock oldBlock,
final long recoveryId, final long recoveryId,
final long newlength) throws IOException { final long newlength) throws IOException {
//get replica //get replica
final ReplicaInfo replica = volumeMap.get(oldBlock.getBlockPoolId(), final String bpid = oldBlock.getBlockPoolId();
oldBlock.getBlockId()); final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
DataNode.LOG.info("updateReplica: block=" + oldBlock DataNode.LOG.info("updateReplica: block=" + oldBlock
+ ", recoveryId=" + recoveryId + ", recoveryId=" + recoveryId
+ ", length=" + newlength + ", length=" + newlength
@ -2457,10 +2456,18 @@ class FSDataset implements FSDatasetInterface<FSDataset.FSVolume> {
//update replica //update replica
final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock
.getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, newlength); .getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, newlength);
assert finalized.getBlockId() == oldBlock.getBlockId()
&& finalized.getGenerationStamp() == recoveryId
&& finalized.getNumBytes() == newlength
: "Replica information mismatched: oldBlock=" + oldBlock
+ ", recoveryId=" + recoveryId + ", newlength=" + newlength
+ ", finalized=" + finalized;
//check replica files after update //check replica files after update
checkReplicaFiles(finalized); checkReplicaFiles(finalized);
return finalized;
//return storage ID
return getVolume(new ExtendedBlock(bpid, finalized)).getStorageID();
} }
private FinalizedReplica updateReplicaUnderRecovery( private FinalizedReplica updateReplicaUnderRecovery(

View File

@ -87,6 +87,9 @@ public interface FSDatasetInterface<V extends FsVolumeSpi>
/** @return a list of volumes. */ /** @return a list of volumes. */
public List<V> getVolumes(); public List<V> getVolumes();
/** @return the volume that contains a replica of the block. */
public V getVolume(ExtendedBlock b);
/** @return a volume information map (name => info). */ /** @return a volume information map (name => info). */
public Map<String, Object> getVolumeInfoMap(); public Map<String, Object> getVolumeInfoMap();
@ -336,11 +339,11 @@ public interface FSDatasetInterface<V extends FsVolumeSpi>
/** /**
* Update replica's generation stamp and length and finalize it. * Update replica's generation stamp and length and finalize it.
* @return the ID of storage that stores the block
*/ */
public ReplicaInfo updateReplicaUnderRecovery( public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
ExtendedBlock oldBlock, long recoveryId, long newLength) throws IOException;
long recoveryId,
long newLength) throws IOException;
/** /**
* add new block pool ID * add new block pool ID
* @param bpid Block pool Id * @param bpid Block pool Id

View File

@ -2830,7 +2830,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
void commitBlockSynchronization(ExtendedBlock lastblock, void commitBlockSynchronization(ExtendedBlock lastblock,
long newgenerationstamp, long newlength, long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets) boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
String[] newtargetstorages)
throws IOException, UnresolvedLinkException { throws IOException, UnresolvedLinkException {
String src = ""; String src = "";
writeLock(); writeLock();

View File

@ -546,10 +546,11 @@ class NameNodeRpcServer implements NamenodeProtocols {
@Override // DatanodeProtocol @Override // DatanodeProtocol
public void commitBlockSynchronization(ExtendedBlock block, public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength, long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets) boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
String[] newtargetstorages)
throws IOException { throws IOException {
namesystem.commitBlockSynchronization(block, namesystem.commitBlockSynchronization(block, newgenerationstamp,
newgenerationstamp, newlength, closeFile, deleteblock, newtargets); newlength, closeFile, deleteblock, newtargets, newtargetstorages);
} }
@Override // ClientProtocol @Override // ClientProtocol

View File

@ -176,6 +176,6 @@ public interface DatanodeProtocol {
*/ */
public void commitBlockSynchronization(ExtendedBlock block, public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength, long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
) throws IOException; String[] newtargetstorages) throws IOException;
} }

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock; import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.KerberosInfo;
/** An inter-datanode protocol for updating generation stamp /** An inter-datanode protocol for updating generation stamp
@ -55,9 +54,6 @@ public interface InterDatanodeProtocol {
* *
* For more details on protocol buffer wire protocol, please see * For more details on protocol buffer wire protocol, please see
* .../org/apache/hadoop/hdfs/protocolPB/overview.html * .../org/apache/hadoop/hdfs/protocolPB/overview.html
*
* The log of historical changes can be retrieved from the svn).
* 6: Add block pool ID to Block
*/ */
public static final long versionID = 6L; public static final long versionID = 6L;
@ -73,7 +69,6 @@ public interface InterDatanodeProtocol {
/** /**
* Update replica with the new generation stamp and length. * Update replica with the new generation stamp and length.
*/ */
ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock, String updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId,
long recoveryId, long newLength) throws IOException;
long newLength) throws IOException;
} }

View File

@ -339,6 +339,7 @@ message CommitBlockSynchronizationRequestProto {
required bool closeFile = 4; required bool closeFile = 4;
required bool deleteBlock = 5; required bool deleteBlock = 5;
repeated DatanodeIDProto newTaragets = 6; repeated DatanodeIDProto newTaragets = 6;
repeated string newTargetStorages = 7;
} }
/** /**

View File

@ -55,7 +55,7 @@ message UpdateReplicaUnderRecoveryRequestProto {
* Response returns updated block information * Response returns updated block information
*/ */
message UpdateReplicaUnderRecoveryResponseProto { message UpdateReplicaUnderRecoveryResponseProto {
required ExtendedBlockProto block = 1; // Updated block information required string storageID = 1; // ID of the storage that stores replica
} }
/** /**

View File

@ -903,11 +903,10 @@ public class SimulatedFSDataset implements FSDatasetInterface<FsVolumeSpi> {
} }
@Override // FSDatasetInterface @Override // FSDatasetInterface
public FinalizedReplica updateReplicaUnderRecovery(ExtendedBlock oldBlock, public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId, long recoveryId,
long newlength) { long newlength) {
return new FinalizedReplica( return storageId;
oldBlock.getBlockId(), newlength, recoveryId, null, null);
} }
@Override // FSDatasetInterface @Override // FSDatasetInterface
@ -985,4 +984,9 @@ public class SimulatedFSDataset implements FSDatasetInterface<FsVolumeSpi> {
public RollingLogs createRollingLogs(String bpid, String prefix) { public RollingLogs createRollingLogs(String bpid, String prefix) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@Override
public FsVolumeSpi getVolume(ExtendedBlock b) {
throw new UnsupportedOperationException();
}
} }

View File

@ -18,6 +18,27 @@
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger; import org.apache.commons.logging.impl.Log4JLogger;
@ -34,10 +55,10 @@ import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException; import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord; import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -45,10 +66,9 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo; import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
@ -62,16 +82,6 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/** /**
* This tests if sync all replicas in block recovery works correctly * This tests if sync all replicas in block recovery works correctly
*/ */
@ -196,11 +206,9 @@ public class TestBlockRecovery {
syncList.add(record2); syncList.add(record2);
when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
anyLong())).thenReturn(new ExtendedBlock(block.getBlockPoolId(), anyLong())).thenReturn("storage1");
block.getBlockId(), expectLen, block.getGenerationStamp()));
when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(), when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
anyLong())).thenReturn(new ExtendedBlock(block.getBlockPoolId(), anyLong())).thenReturn("storage2");
block.getBlockId(), expectLen, block.getGenerationStamp()));
dn.syncBlock(rBlock, syncList); dn.syncBlock(rBlock, syncList);
} }
@ -463,7 +471,7 @@ public class TestBlockRecovery {
d.join(); d.join();
DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID); DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID);
verify(dnP).commitBlockSynchronization( verify(dnP).commitBlockSynchronization(
block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY); block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY, null);
} }
private List<BlockRecord> initBlockRecords(DataNode spyDN) throws IOException { private List<BlockRecord> initBlockRecords(DataNode spyDN) throws IOException {
@ -521,7 +529,7 @@ public class TestBlockRecovery {
DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID); DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID);
verify(namenode, never()).commitBlockSynchronization( verify(namenode, never()).commitBlockSynchronization(
any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(), any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
anyBoolean(), any(DatanodeID[].class)); anyBoolean(), any(DatanodeID[].class), any(String[].class));
} }
/** /**
@ -550,7 +558,7 @@ public class TestBlockRecovery {
DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID); DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID);
verify(namenode, never()).commitBlockSynchronization( verify(namenode, never()).commitBlockSynchronization(
any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(), any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
anyBoolean(), any(DatanodeID[].class)); anyBoolean(), any(DatanodeID[].class), any(String[].class));
} finally { } finally {
streams.close(); streams.close();
} }

View File

@ -329,14 +329,9 @@ public class TestInterDatanodeProtocol {
} }
//update //update
final ReplicaInfo finalized = fsdataset.updateReplicaUnderRecovery( final String storageID = fsdataset.updateReplicaUnderRecovery(
new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, newlength); new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, newlength);
assertTrue(storageID != null);
//check meta data after update
FSDataset.checkReplicaFiles(finalized);
Assert.assertEquals(b.getBlockId(), finalized.getBlockId());
Assert.assertEquals(recoveryid, finalized.getGenerationStamp());
Assert.assertEquals(newlength, finalized.getNumBytes());
} finally { } finally {
if (cluster != null) cluster.shutdown(); if (cluster != null) cluster.shutdown();

View File

@ -307,7 +307,8 @@ public class TestPipelinesFailover {
Mockito.anyLong(), // new length Mockito.anyLong(), // new length
Mockito.eq(true), // close file Mockito.eq(true), // close file
Mockito.eq(false), // delete block Mockito.eq(false), // delete block
(DatanodeID[]) Mockito.anyObject()); // new targets (DatanodeID[]) Mockito.anyObject(), // new targets
(String[]) Mockito.anyObject()); // new target storages
DistributedFileSystem fsOtherUser = createFsAsOtherUser(cluster, conf); DistributedFileSystem fsOtherUser = createFsAsOtherUser(cluster, conf);
assertFalse(fsOtherUser.recoverLease(TEST_PATH)); assertFalse(fsOtherUser.recoverLease(TEST_PATH));