svn merge -c 1302683 from trunk for HDFS-3105.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.23@1302685 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-03-19 22:12:38 +00:00
parent c02c9218d9
commit 35e04b4c5e
18 changed files with 127 additions and 96 deletions

View File

@ -148,6 +148,8 @@ Release 0.23.3 - UNRELEASED
HDFS-3091. Update the usage limitations of ReplaceDatanodeOnFailure policy in
the config description for the smaller clusters. (szetszwo via umamahesh)
HDFS-3105. Add DatanodeStorage information to block recovery. (szetszwo)
OPTIMIZATIONS
HDFS-2477. Optimize computing the diff between a block report and the
namenode state. (Tomasz Nykiel via hairong)

View File

@ -293,7 +293,8 @@ public void reportBadBlocks(LocatedBlock[] blocks) throws IOException {
@Override
public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength, boolean closeFile,
boolean deleteblock, DatanodeID[] newtargets) throws IOException {
boolean deleteblock, DatanodeID[] newtargets, String[] newtargetstorages
) throws IOException {
CommitBlockSynchronizationRequestProto.Builder builder =
CommitBlockSynchronizationRequestProto.newBuilder()
.setBlock(PBHelper.convert(block)).setNewGenStamp(newgenerationstamp)
@ -301,6 +302,7 @@ public void commitBlockSynchronization(ExtendedBlock block,
.setDeleteBlock(deleteblock);
for (int i = 0; i < newtargets.length; i++) {
builder.addNewTaragets(PBHelper.convert(newtargets[i]));
builder.addNewTargetStorages(newtargetstorages[i]);
}
CommitBlockSynchronizationRequestProto req = builder.build();
try {

View File

@ -259,10 +259,12 @@ public CommitBlockSynchronizationResponseProto commitBlockSynchronization(
for (int i = 0; i < dnprotos.size(); i++) {
dns[i] = PBHelper.convert(dnprotos.get(i));
}
final List<String> sidprotos = request.getNewTargetStoragesList();
final String[] storageIDs = sidprotos.toArray(new String[sidprotos.size()]);
try {
impl.commitBlockSynchronization(PBHelper.convert(request.getBlock()),
request.getNewGenStamp(), request.getNewLength(),
request.getCloseFile(), request.getDeleteBlock(), dns);
request.getCloseFile(), request.getDeleteBlock(), dns, storageIDs);
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -20,7 +20,6 @@
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.InitReplicaRecoveryResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.InterDatanodeProtocolProtos.UpdateReplicaUnderRecoveryRequestProto;
@ -66,14 +65,15 @@ public InitReplicaRecoveryResponseProto initReplicaRecovery(
public UpdateReplicaUnderRecoveryResponseProto updateReplicaUnderRecovery(
RpcController unused, UpdateReplicaUnderRecoveryRequestProto request)
throws ServiceException {
ExtendedBlock b;
final String storageID;
try {
b = impl.updateReplicaUnderRecovery(PBHelper.convert(request.getBlock()),
storageID = impl.updateReplicaUnderRecovery(
PBHelper.convert(request.getBlock()),
request.getRecoveryId(), request.getNewLength());
} catch (IOException e) {
throw new ServiceException(e);
}
return UpdateReplicaUnderRecoveryResponseProto.newBuilder()
.setBlock(PBHelper.convert(b)).build();
.setStorageID(storageID).build();
}
}

View File

@ -91,15 +91,15 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
}
@Override
public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock,
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId, long newLength) throws IOException {
UpdateReplicaUnderRecoveryRequestProto req =
UpdateReplicaUnderRecoveryRequestProto.newBuilder()
.setBlock(PBHelper.convert(oldBlock))
.setNewLength(newLength).setRecoveryId(recoveryId).build();
try {
return PBHelper.convert(rpcProxy.updateReplicaUnderRecovery(
NULL_CONTROLLER, req).getBlock());
return rpcProxy.updateReplicaUnderRecovery(NULL_CONTROLLER, req
).getStorageID();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}

View File

@ -1770,10 +1770,9 @@ private static ReplicaRecoveryInfo callInitReplicaRecovery(
* Update replica with the new generation stamp and length.
*/
@Override // InterDatanodeProtocol
public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId,
long newLength) throws IOException {
ReplicaInfo r = data.updateReplicaUnderRecovery(oldBlock,
public String updateReplicaUnderRecovery(final ExtendedBlock oldBlock,
final long recoveryId, final long newLength) throws IOException {
final String storageID = data.updateReplicaUnderRecovery(oldBlock,
recoveryId, newLength);
// Notify the namenode of the updated block info. This is important
// for HA, since otherwise the standby node may lose track of the
@ -1782,7 +1781,7 @@ public ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock,
newBlock.setGenerationStamp(recoveryId);
newBlock.setNumBytes(newLength);
notifyNamenodeReceivedBlock(newBlock, "");
return new ExtendedBlock(oldBlock.getBlockPoolId(), r);
return storageID;
}
/** A convenient class used in block recovery */
@ -1791,6 +1790,8 @@ static class BlockRecord {
final InterDatanodeProtocol datanode;
final ReplicaRecoveryInfo rInfo;
private String storageID;
BlockRecord(DatanodeID id,
InterDatanodeProtocol datanode,
ReplicaRecoveryInfo rInfo) {
@ -1799,6 +1800,12 @@ static class BlockRecord {
this.rInfo = rInfo;
}
void updateReplicaUnderRecovery(String bpid, long recoveryId, long newLength
) throws IOException {
final ExtendedBlock b = new ExtendedBlock(bpid, rInfo);
storageID = datanode.updateReplicaUnderRecovery(b, recoveryId, newLength);
}
@Override
public String toString() {
return "block:" + rInfo + " node:" + id;
@ -1875,6 +1882,7 @@ public DatanodeProtocolClientSideTranslatorPB getActiveNamenodeForBP(String bpid
void syncBlock(RecoveringBlock rBlock,
List<BlockRecord> syncList) throws IOException {
ExtendedBlock block = rBlock.getBlock();
final String bpid = block.getBlockPoolId();
DatanodeProtocolClientSideTranslatorPB nn =
getActiveNamenodeForBP(block.getBlockPoolId());
if (nn == null) {
@ -1894,7 +1902,7 @@ void syncBlock(RecoveringBlock rBlock,
// The block can be deleted.
if (syncList.isEmpty()) {
nn.commitBlockSynchronization(block, recoveryId, 0,
true, true, DatanodeID.EMPTY_ARRAY);
true, true, DatanodeID.EMPTY_ARRAY, null);
return;
}
@ -1917,8 +1925,8 @@ void syncBlock(RecoveringBlock rBlock,
// Calculate list of nodes that will participate in the recovery
// and the new block size
List<BlockRecord> participatingList = new ArrayList<BlockRecord>();
final ExtendedBlock newBlock = new ExtendedBlock(block.getBlockPoolId(), block
.getBlockId(), -1, recoveryId);
final ExtendedBlock newBlock = new ExtendedBlock(bpid, block.getBlockId(),
-1, recoveryId);
switch(bestState) {
case FINALIZED:
assert finalizedLength > 0 : "finalizedLength is not positive";
@ -1949,16 +1957,11 @@ void syncBlock(RecoveringBlock rBlock,
}
List<DatanodeID> failedList = new ArrayList<DatanodeID>();
List<DatanodeID> successList = new ArrayList<DatanodeID>();
final List<BlockRecord> successList = new ArrayList<BlockRecord>();
for(BlockRecord r : participatingList) {
try {
ExtendedBlock reply = r.datanode.updateReplicaUnderRecovery(
new ExtendedBlock(newBlock.getBlockPoolId(), r.rInfo), recoveryId,
newBlock.getNumBytes());
assert reply.equals(newBlock) &&
reply.getNumBytes() == newBlock.getNumBytes() :
"Updated replica must be the same as the new block.";
successList.add(r.id);
r.updateReplicaUnderRecovery(bpid, recoveryId, newBlock.getNumBytes());
successList.add(r);
} catch (IOException e) {
InterDatanodeProtocol.LOG.warn("Failed to updateBlock (newblock="
+ newBlock + ", datanode=" + r.id + ")", e);
@ -1979,10 +1982,16 @@ void syncBlock(RecoveringBlock rBlock,
}
// Notify the name-node about successfully recovered replicas.
DatanodeID[] nlist = successList.toArray(new DatanodeID[successList.size()]);
final DatanodeID[] datanodes = new DatanodeID[successList.size()];
final String[] storages = new String[datanodes.length];
for(int i = 0; i < datanodes.length; i++) {
final BlockRecord r = successList.get(i);
datanodes[i] = r.id;
storages[i] = r.storageID;
}
nn.commitBlockSynchronization(block,
newBlock.getGenerationStamp(), newBlock.getNumBytes(), true, false,
nlist);
datanodes, storages);
}
private static void logRecoverBlock(String who,

View File

@ -553,14 +553,16 @@ public void shutdown() {
*/
static class FSVolume implements FsVolumeSpi {
private final FSDataset dataset;
private final String storageID;
private final Map<String, BlockPoolSlice> map = new HashMap<String, BlockPoolSlice>();
private final File currentDir; // <StorageDirectory>/current
private final DF usage;
private final long reserved;
FSVolume(FSDataset dataset, File currentDir, Configuration conf
) throws IOException {
FSVolume(FSDataset dataset, String storageID, File currentDir,
Configuration conf) throws IOException {
this.dataset = dataset;
this.storageID = storageID;
this.reserved = conf.getLong(DFSConfigKeys.DFS_DATANODE_DU_RESERVED_KEY,
DFSConfigKeys.DFS_DATANODE_DU_RESERVED_DEFAULT);
this.currentDir = currentDir;
@ -808,6 +810,10 @@ private void deleteBPDirectories(String bpid, boolean force)
}
}
}
String getStorageID() {
return storageID;
}
}
static class FSVolumeSet {
@ -1017,6 +1023,12 @@ public List<FSVolume> getVolumes() {
return volumes.volumes;
}
@Override
public synchronized FSVolume getVolume(final ExtendedBlock b) {
final ReplicaInfo r = volumeMap.get(b.getBlockPoolId(), b.getLocalBlock());
return r != null? (FSVolume)r.getVolume(): null;
}
@Override // FSDatasetInterface
public synchronized Block getStoredBlock(String bpid, long blkid)
throws IOException {
@ -1107,7 +1119,7 @@ private FSDataset(DataNode datanode, DataStorage storage, Configuration conf
storage.getNumStorageDirs());
for (int idx = 0; idx < storage.getNumStorageDirs(); idx++) {
final File dir = storage.getStorageDir(idx).getCurrentDir();
volArray.add(new FSVolume(this, dir, conf));
volArray.add(new FSVolume(this, storage.getStorageID(), dir, conf));
DataNode.LOG.info("FSDataset added volume - " + dir);
}
volumeMap = new ReplicasMap(this);
@ -1758,19 +1770,6 @@ public void adjustCrcChannelPosition(ExtendedBlock b, ReplicaOutputStreams strea
channel.position(newPos);
}
synchronized File createTmpFile(FSVolume vol, String bpid, Block blk) throws IOException {
if ( vol == null ) {
ReplicaInfo replica = volumeMap.get(bpid, blk);
if (replica != null) {
vol = (FSVolume)volumeMap.get(bpid, blk).getVolume();
}
if ( vol == null ) {
throw new IOException("Could not find volume for block " + blk);
}
}
return vol.createTmpFile(bpid, blk);
}
//
// REMIND - mjc - eventually we should have a timeout system
// in place to clean up block files left by abandoned clients.
@ -2421,13 +2420,13 @@ static ReplicaRecoveryInfo initReplicaRecovery(String bpid,
}
@Override // FSDatasetInterface
public synchronized ReplicaInfo updateReplicaUnderRecovery(
public synchronized String updateReplicaUnderRecovery(
final ExtendedBlock oldBlock,
final long recoveryId,
final long newlength) throws IOException {
//get replica
final ReplicaInfo replica = volumeMap.get(oldBlock.getBlockPoolId(),
oldBlock.getBlockId());
final String bpid = oldBlock.getBlockPoolId();
final ReplicaInfo replica = volumeMap.get(bpid, oldBlock.getBlockId());
DataNode.LOG.info("updateReplica: block=" + oldBlock
+ ", recoveryId=" + recoveryId
+ ", length=" + newlength
@ -2457,10 +2456,18 @@ public synchronized ReplicaInfo updateReplicaUnderRecovery(
//update replica
final FinalizedReplica finalized = updateReplicaUnderRecovery(oldBlock
.getBlockPoolId(), (ReplicaUnderRecovery) replica, recoveryId, newlength);
assert finalized.getBlockId() == oldBlock.getBlockId()
&& finalized.getGenerationStamp() == recoveryId
&& finalized.getNumBytes() == newlength
: "Replica information mismatched: oldBlock=" + oldBlock
+ ", recoveryId=" + recoveryId + ", newlength=" + newlength
+ ", finalized=" + finalized;
//check replica files after update
checkReplicaFiles(finalized);
return finalized;
//return storage ID
return getVolume(new ExtendedBlock(bpid, finalized)).getStorageID();
}
private FinalizedReplica updateReplicaUnderRecovery(

View File

@ -87,6 +87,9 @@ public boolean isSimulated() {
/** @return a list of volumes. */
public List<V> getVolumes();
/** @return the volume that contains a replica of the block. */
public V getVolume(ExtendedBlock b);
/** @return a volume information map (name => info). */
public Map<String, Object> getVolumeInfoMap();
@ -336,11 +339,11 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
/**
* Update replica's generation stamp and length and finalize it.
* @return the ID of storage that stores the block
*/
public ReplicaInfo updateReplicaUnderRecovery(
ExtendedBlock oldBlock,
long recoveryId,
long newLength) throws IOException;
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId, long newLength) throws IOException;
/**
* add new block pool ID
* @param bpid Block pool Id

View File

@ -2828,7 +2828,8 @@ private void finalizeINodeFileUnderConstruction(String src,
void commitBlockSynchronization(ExtendedBlock lastblock,
long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
String[] newtargetstorages)
throws IOException, UnresolvedLinkException {
String src = "";
writeLock();

View File

@ -547,10 +547,11 @@ public void updatePipeline(String clientName, ExtendedBlock oldBlock,
@Override // DatanodeProtocol
public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets)
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
String[] newtargetstorages)
throws IOException {
namesystem.commitBlockSynchronization(block,
newgenerationstamp, newlength, closeFile, deleteblock, newtargets);
namesystem.commitBlockSynchronization(block, newgenerationstamp,
newlength, closeFile, deleteblock, newtargets, newtargetstorages);
}
@Override // ClientProtocol

View File

@ -176,6 +176,6 @@ public void errorReport(DatanodeRegistration registration,
*/
public void commitBlockSynchronization(ExtendedBlock block,
long newgenerationstamp, long newlength,
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets
) throws IOException;
boolean closeFile, boolean deleteblock, DatanodeID[] newtargets,
String[] newtargetstorages) throws IOException;
}

View File

@ -26,7 +26,6 @@
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.KerberosInfo;
/** An inter-datanode protocol for updating generation stamp
@ -55,9 +54,6 @@ public interface InterDatanodeProtocol {
*
* For more details on protocol buffer wire protocol, please see
* .../org/apache/hadoop/hdfs/protocolPB/overview.html
*
* The log of historical changes can be retrieved from the svn).
* 6: Add block pool ID to Block
*/
public static final long versionID = 6L;
@ -73,7 +69,6 @@ ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
/**
* Update replica with the new generation stamp and length.
*/
ExtendedBlock updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId,
long newLength) throws IOException;
String updateReplicaUnderRecovery(ExtendedBlock oldBlock, long recoveryId,
long newLength) throws IOException;
}

View File

@ -339,6 +339,7 @@ message CommitBlockSynchronizationRequestProto {
required bool closeFile = 4;
required bool deleteBlock = 5;
repeated DatanodeIDProto newTaragets = 6;
repeated string newTargetStorages = 7;
}
/**

View File

@ -55,7 +55,7 @@ message UpdateReplicaUnderRecoveryRequestProto {
* Response returns updated block information
*/
message UpdateReplicaUnderRecoveryResponseProto {
required ExtendedBlockProto block = 1; // Updated block information
required string storageID = 1; // ID of the storage that stores replica
}
/**

View File

@ -903,11 +903,10 @@ public ReplicaRecoveryInfo initReplicaRecovery(RecoveringBlock rBlock)
}
@Override // FSDatasetInterface
public FinalizedReplica updateReplicaUnderRecovery(ExtendedBlock oldBlock,
public String updateReplicaUnderRecovery(ExtendedBlock oldBlock,
long recoveryId,
long newlength) {
return new FinalizedReplica(
oldBlock.getBlockId(), newlength, recoveryId, null, null);
return storageId;
}
@Override // FSDatasetInterface
@ -985,4 +984,9 @@ public Map<String, Object> getVolumeInfoMap() {
public RollingLogs createRollingLogs(String bpid, String prefix) {
throw new UnsupportedOperationException();
}
@Override
public FsVolumeSpi getVolume(ExtendedBlock b) {
throw new UnsupportedOperationException();
}
}

View File

@ -18,6 +18,27 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.commons.logging.impl.Log4JLogger;
@ -34,10 +55,10 @@
import org.apache.hadoop.hdfs.protocol.RecoveryInProgressException;
import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.ReplicaState;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.datanode.DataNode.BlockRecord;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams;
import org.apache.hadoop.hdfs.server.namenode.FSNamesystem;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
@ -45,10 +66,9 @@
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.InterDatanodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReplicaRecoveryInfo;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat.State;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Daemon;
@ -62,16 +82,6 @@
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;
import java.io.File;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
/**
* This tests if sync all replicas in block recovery works correctly
*/
@ -196,11 +206,9 @@ private void testSyncReplicas(ReplicaRecoveryInfo replica1,
syncList.add(record2);
when(dn1.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
anyLong())).thenReturn(new ExtendedBlock(block.getBlockPoolId(),
block.getBlockId(), expectLen, block.getGenerationStamp()));
anyLong())).thenReturn("storage1");
when(dn2.updateReplicaUnderRecovery((ExtendedBlock)anyObject(), anyLong(),
anyLong())).thenReturn(new ExtendedBlock(block.getBlockPoolId(),
block.getBlockId(), expectLen, block.getGenerationStamp()));
anyLong())).thenReturn("storage2");
dn.syncBlock(rBlock, syncList);
}
@ -463,7 +471,7 @@ public void testZeroLenReplicas() throws IOException, InterruptedException {
d.join();
DatanodeProtocol dnP = dn.getActiveNamenodeForBP(POOL_ID);
verify(dnP).commitBlockSynchronization(
block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY);
block, RECOVERY_ID, 0, true, true, DatanodeID.EMPTY_ARRAY, null);
}
private List<BlockRecord> initBlockRecords(DataNode spyDN) throws IOException {
@ -521,7 +529,7 @@ public void testNoReplicaUnderRecovery() throws IOException {
DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID);
verify(namenode, never()).commitBlockSynchronization(
any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
anyBoolean(), any(DatanodeID[].class));
anyBoolean(), any(DatanodeID[].class), any(String[].class));
}
/**
@ -550,7 +558,7 @@ public void testNotMatchedReplicaID() throws IOException {
DatanodeProtocol namenode = dn.getActiveNamenodeForBP(POOL_ID);
verify(namenode, never()).commitBlockSynchronization(
any(ExtendedBlock.class), anyLong(), anyLong(), anyBoolean(),
anyBoolean(), any(DatanodeID[].class));
anyBoolean(), any(DatanodeID[].class), any(String[].class));
} finally {
streams.close();
}

View File

@ -329,14 +329,9 @@ public void testUpdateReplicaUnderRecovery() throws IOException {
}
//update
final ReplicaInfo finalized = fsdataset.updateReplicaUnderRecovery(
final String storageID = fsdataset.updateReplicaUnderRecovery(
new ExtendedBlock(b.getBlockPoolId(), rri), recoveryid, newlength);
//check meta data after update
FSDataset.checkReplicaFiles(finalized);
Assert.assertEquals(b.getBlockId(), finalized.getBlockId());
Assert.assertEquals(recoveryid, finalized.getGenerationStamp());
Assert.assertEquals(newlength, finalized.getNumBytes());
assertTrue(storageID != null);
} finally {
if (cluster != null) cluster.shutdown();

View File

@ -307,7 +307,8 @@ public void testFailoverRightBeforeCommitSynchronization() throws Exception {
Mockito.anyLong(), // new length
Mockito.eq(true), // close file
Mockito.eq(false), // delete block
(DatanodeID[]) Mockito.anyObject()); // new targets
(DatanodeID[]) Mockito.anyObject(), // new targets
(String[]) Mockito.anyObject()); // new target storages
DistributedFileSystem fsOtherUser = createFsAsOtherUser(cluster, conf);
assertFalse(fsOtherUser.recoverLease(TEST_PATH));