HDFS-6925. DataNode should attempt to place replicas on transient storage first if lazyPersist flag is received. (Arpit Agarwal)

This commit is contained in:
arp 2014-08-27 15:35:47 -07:00 committed by arp7
parent c2354a7f81
commit a317bd7b02
24 changed files with 234 additions and 71 deletions

View File

@ -10,3 +10,6 @@
HDFS-6923. Propagate LazyPersist flag to DNs via DataTransferProtocol.
(Arpit Agarwal)
HDFS-6925. DataNode should attempt to place replicas on transient storage
first if lazyPersist flag is received. (Arpit Agarwal)

View File

@ -139,7 +139,8 @@ class BlockReceiver implements Closeable {
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
final String clientname, final DatanodeInfo srcDataNode,
final DataNode datanode, DataChecksum requestedChecksum,
CachingStrategy cachingStrategy) throws IOException {
CachingStrategy cachingStrategy,
final boolean allowLazyPersist) throws IOException {
try{
this.block = block;
this.in = in;
@ -180,7 +181,7 @@ class BlockReceiver implements Closeable {
} else {
switch (stage) {
case PIPELINE_SETUP_CREATE:
replicaInfo = datanode.data.createRbw(storageType, block);
replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist);
datanode.notifyNamenodeReceivingBlock(
block, replicaInfo.getStorageUuid());
break;

View File

@ -607,7 +607,7 @@ class DataXceiver extends Receiver implements Runnable {
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy);
cachingStrategy, allowLazyPersist);
storageUuid = blockReceiver.getStorageUuid();
} else {
@ -1048,7 +1048,7 @@ class DataXceiver extends Receiver implements Runnable {
proxyReply, proxySock.getRemoteSocketAddress().toString(),
proxySock.getLocalSocketAddress().toString(),
null, 0, 0, 0, "", null, datanode, remoteChecksum,
CachingStrategy.newDropBehind());
CachingStrategy.newDropBehind(), false);
// receive a block
blockReceiver.receiveBlock(null, null, replyOut, null,

View File

@ -399,7 +399,7 @@ public class DirectoryScanner implements Runnable {
/**
* Reconcile differences between disk and in-memory blocks
*/
void reconcile() {
void reconcile() throws IOException {
scan();
for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
String bpid = entry.getKey();

View File

@ -59,4 +59,9 @@ public interface Replica {
* Return the storageUuid of the volume that stores this replica.
*/
public String getStorageUuid();
/**
* Return true if the target volume is backed by RAM.
*/
public boolean isOnTransientStorage();
}

View File

@ -61,17 +61,6 @@ abstract public class ReplicaInfo extends Block implements Replica {
private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
/**
* Constructor for a zero length replica
* @param blockId block id
* @param genStamp replica generation stamp
* @param vol volume where replica is located
* @param dir directory path where block and meta files are located
*/
ReplicaInfo(long blockId, long genStamp, FsVolumeSpi vol, File dir) {
this( blockId, 0L, genStamp, vol, dir);
}
/**
* Constructor
* @param block a block
@ -289,20 +278,6 @@ abstract public class ReplicaInfo extends Block implements Replica {
return true;
}
/**
* Set this replica's generation stamp to be a newer one
* @param newGS new generation stamp
* @throws IOException is the new generation stamp is not greater than the current one
*/
void setNewerGenerationStamp(long newGS) throws IOException {
long curGS = getGenerationStamp();
if (newGS <= curGS) {
throw new IOException("New generation stamp (" + newGS
+ ") must be greater than current one (" + curGS + ")");
}
setGenerationStamp(newGS);
}
@Override //Object
public String toString() {
return getClass().getSimpleName()
@ -314,4 +289,9 @@ abstract public class ReplicaInfo extends Block implements Replica {
+ "\n getVolume() = " + getVolume()
+ "\n getBlockFile() = " + getBlockFile();
}
@Override
public boolean isOnTransientStorage() {
return volume.isTransientStorage();
}
}

View File

@ -37,8 +37,7 @@ public class ReplicaUnderRecovery extends ReplicaInfo {
// that the replica will be bumped to after recovery
public ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) {
super(replica.getBlockId(), replica.getNumBytes(), replica.getGenerationStamp(),
replica.getVolume(), replica.getDir());
super(replica, replica.getVolume(), replica.getDir());
if ( replica.getState() != ReplicaState.FINALIZED &&
replica.getState() != ReplicaState.RBW &&
replica.getState() != ReplicaState.RWR ) {

View File

@ -99,7 +99,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
@Override
public synchronized V chooseVolume(List<V> volumes,
final long replicaSize) throws IOException {
long replicaSize) throws IOException {
if (volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}

View File

@ -119,7 +119,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* as corrupted.
*/
public void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FsVolumeSpi vol);
File diskMetaFile, FsVolumeSpi vol) throws IOException;
/**
* @param b - the block
@ -194,7 +194,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
* @throws IOException if an error occurs
*/
public ReplicaInPipelineInterface createRbw(StorageType storageType,
ExtendedBlock b) throws IOException;
ExtendedBlock b, boolean allowLazyPersist) throws IOException;
/**
* Recovers a RBW replica and returns the meta info of the replica

View File

@ -45,4 +45,7 @@ public interface FsVolumeSpi {
public File getFinalizedDir(String bpid) throws IOException;
public StorageType getStorageType();
/** Returns true if the volume is NOT backed by persistent storage. */
public boolean isTransientStorage();
}

View File

@ -20,6 +20,9 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.StorageType;
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
/**
@ -27,12 +30,14 @@ import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
*/
public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
implements VolumeChoosingPolicy<V> {
public static final Log LOG = LogFactory.getLog(RoundRobinVolumeChoosingPolicy.class);
private int curVolume = 0;
@Override
public synchronized V chooseVolume(final List<V> volumes, final long blockSize
) throws IOException {
public synchronized V chooseVolume(final List<V> volumes, long blockSize)
throws IOException {
if(volumes.size() < 1) {
throw new DiskOutOfSpaceException("No more available volumes");
}
@ -50,7 +55,9 @@ public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
final V volume = volumes.get(curVolume);
curVolume = (curVolume + 1) % volumes.size();
long availableVolumeSize = volume.getAvailable();
if (availableVolumeSize > blockSize) { return volume; }
if (availableVolumeSize > blockSize) {
return volume;
}
if (availableVolumeSize > maxAvailable) {
maxAvailable = availableVolumeSize;

View File

@ -39,6 +39,9 @@ import javax.management.NotCompliantMBeanException;
import javax.management.ObjectName;
import javax.management.StandardMBean;
import com.google.common.base.Preconditions;
import com.google.common.collect.TreeMultimap;
import org.apache.commons.io.FileUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
@ -271,7 +274,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
// If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
// nothing needed to be rolled back to make various data structures, e.g.,
// storageMap and asyncDiskService, consistent.
FsVolumeImpl fsVolume = new FsVolumeImpl(
FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume(
this, sd.getStorageUuid(), dir, this.conf, storageType);
fsVolume.getVolumeMap(volumeMap);
@ -417,16 +420,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
* Get File name for a given block.
*/
private File getBlockFile(ExtendedBlock b) throws IOException {
return getBlockFile(b.getBlockPoolId(), b.getLocalBlock());
return getBlockFile(b.getBlockPoolId(), b.getBlockId());
}
/**
* Get File name for a given block.
*/
File getBlockFile(String bpid, Block b) throws IOException {
File f = validateBlockFile(bpid, b);
File getBlockFile(String bpid, long blockId) throws IOException {
File f = validateBlockFile(bpid, blockId);
if(f == null) {
throw new IOException("Block " + b + " is not valid.");
throw new IOException("BlockId " + blockId + " is not valid.");
}
return f;
}
@ -816,7 +819,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
@Override // FsDatasetSpi
public synchronized ReplicaInPipeline createRbw(StorageType storageType,
ExtendedBlock b) throws IOException {
ExtendedBlock b, boolean allowLazyPersist) throws IOException {
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
b.getBlockId());
if (replicaInfo != null) {
@ -825,8 +828,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
" and thus cannot be created.");
}
// create a new block
FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
// create a rbw file to hold block in the designated volume
FsVolumeImpl v;
while (true) {
try {
if (allowLazyPersist) {
// First try to place the block on a transient volume.
v = volumes.getNextTransientVolume(b.getNumBytes());
} else {
v = volumes.getNextVolume(storageType, b.getNumBytes());
}
} catch (DiskOutOfSpaceException de) {
if (allowLazyPersist) {
allowLazyPersist = false;
continue;
}
throw de;
}
break;
}
// create an rbw file to hold block in the designated volume
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
b.getGenerationStamp(), v, f.getParentFile());
@ -1179,11 +1199,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
/**
* Find the file corresponding to the block and return it if it exists.
*/
File validateBlockFile(String bpid, Block b) {
File validateBlockFile(String bpid, long blockId) {
//Should we check for metadata file too?
final File f;
synchronized(this) {
f = getFile(bpid, b.getBlockId());
f = getFile(bpid, blockId);
}
if(f != null ) {
@ -1195,7 +1215,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
}
if (LOG.isDebugEnabled()) {
LOG.debug("b=" + b + ", f=" + f);
LOG.debug("blockId=" + blockId + ", f=" + f);
}
return null;
}
@ -1330,6 +1350,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
": volume was not an instance of FsVolumeImpl.");
return;
}
if (volume.isTransientStorage()) {
LOG.warn("Caching not supported on block with id " + blockId +
" since the volume is backed by RAM.");
return;
}
success = true;
} finally {
if (!success) {

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.ThreadPoolExecutor;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.StorageType;
/**
* Volume for storing replicas in memory. These can be deleted at any time
* to make space for new replicas and there is no persistence guarantee.
*
* The backing store for these replicas is expected to be RAM_DISK.
* The backing store may be disk when testing.
*
* It uses the {@link FsDatasetImpl} object for synchronization.
*/
@InterfaceAudience.Private
@VisibleForTesting
public class FsTransientVolumeImpl extends FsVolumeImpl {
FsTransientVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
Configuration conf, StorageType storageType)
throws IOException {
super(dataset, storageID, currentDir, conf, storageType);
}
@Override
protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
// Can't 'cache' replicas already in RAM.
return null;
}
@Override
public boolean isTransientStorage() {
return true;
}
}

View File

@ -73,7 +73,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
* dfs.datanode.fsdatasetcache.max.threads.per.volume) to limit resource
* contention.
*/
private final ThreadPoolExecutor cacheExecutor;
protected ThreadPoolExecutor cacheExecutor;
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
Configuration conf, StorageType storageType) throws IOException {
@ -191,6 +191,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
return currentDir.getParent();
}
@Override
public boolean isTransientStorage() {
return false;
}
@Override
public String getPath(String bpid) throws IOException {
return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
@ -272,7 +277,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
}
void shutdown() {
cacheExecutor.shutdown();
if (cacheExecutor != null) {
cacheExecutor.shutdown();
}
Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
for (Entry<String, BlockPoolSlice> entry : set) {
entry.getValue().shutdown();
@ -365,6 +372,5 @@ public class FsVolumeImpl implements FsVolumeSpi {
DatanodeStorage toDatanodeStorage() {
return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
import java.io.File;
import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.StorageType;
/**
* Generate volumes based on the storageType.
*/
@InterfaceAudience.Private
class FsVolumeImplAllocator {
static FsVolumeImpl createVolume(FsDatasetImpl fsDataset, String storageUuid,
File dir, Configuration conf, StorageType storageType)
throws IOException {
switch(storageType) {
case RAM_DISK:
return new FsTransientVolumeImpl(
fsDataset, storageUuid, dir, conf, storageType);
default:
return new FsVolumeImpl(
fsDataset, storageUuid, dir, conf, storageType);
}
}
}

View File

@ -69,6 +69,24 @@ class FsVolumeList {
return blockChooser.chooseVolume(list, blockSize);
}
/**
* Get next volume. Synchronized to ensure {@link #curVolume} is updated
* by a single thread and next volume is chosen with no concurrent
* update to {@link #volumes}.
* @param blockSize free space needed on the volume
* @return next volume to store the block in.
*/
synchronized FsVolumeImpl getNextTransientVolume(
long blockSize) throws IOException {
final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
for(FsVolumeImpl v : volumes) {
if (v.isTransientStorage()) {
list.add(v);
}
}
return blockChooser.chooseVolume(list, blockSize);
}
long getDfsUsed() throws IOException {
long dfsUsed = 0L;
for (FsVolumeImpl v : volumes) {

View File

@ -98,9 +98,10 @@ public class TestWriteBlockGetsBlockLengthHint {
*/
@Override
public synchronized ReplicaInPipelineInterface createRbw(
StorageType storageType, ExtendedBlock b) throws IOException {
StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
throws IOException {
assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));
return super.createRbw(storageType, b);
return super.createRbw(storageType, b, allowLazyPersist);
}
}
}

View File

@ -364,7 +364,7 @@ public abstract class BlockReportTestBase {
// Create a bogus new block which will not be present on the namenode.
ExtendedBlock b = new ExtendedBlock(
poolId, rand.nextLong(), 1024L, rand.nextLong());
dn.getFSDataset().createRbw(StorageType.DEFAULT, b);
dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false);
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);

View File

@ -300,6 +300,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
public ChunkChecksum getLastChecksumAndDataLen() {
return new ChunkChecksum(oStream.getLength(), null);
}
@Override
public boolean isOnTransientStorage() {
return false;
}
}
/**
@ -746,7 +751,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override // FsDatasetSpi
public synchronized ReplicaInPipelineInterface createRbw(
StorageType storageType, ExtendedBlock b) throws IOException {
StorageType storageType, ExtendedBlock b,
boolean allowLazyPersist) throws IOException {
return createTemporary(storageType, b);
}
@ -1074,7 +1080,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
@Override
public void checkAndUpdate(String bpid, long blockId, File diskFile,
File diskMetaFile, FsVolumeSpi vol) {
File diskMetaFile, FsVolumeSpi vol) throws IOException {
throw new UnsupportedOperationException();
}

View File

@ -532,7 +532,7 @@ public class TestBlockRecovery {
if(LOG.isDebugEnabled()) {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
dn.data.createRbw(StorageType.DEFAULT, block);
dn.data.createRbw(StorageType.DEFAULT, block, false);
try {
dn.syncBlock(rBlock, initBlockRecords(dn));
fail("Sync should fail");
@ -556,7 +556,7 @@ public class TestBlockRecovery {
LOG.debug("Running " + GenericTestUtils.getMethodName());
}
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(
StorageType.DEFAULT, block);
StorageType.DEFAULT, block, false);
ReplicaOutputStreams streams = null;
try {
streams = replicaInfo.createStreams(true,

View File

@ -215,7 +215,7 @@ public class TestDirectoryScanner {
}
private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
long missingMemoryBlocks, long mismatchBlocks) {
long missingMemoryBlocks, long mismatchBlocks) throws IOException {
scanner.reconcile();
assertTrue(scanner.diffs.containsKey(bpid));
@ -424,6 +424,11 @@ public class TestDirectoryScanner {
public String getStorageID() {
return "";
}
@Override
public boolean isTransientStorage() {
return false;
}
}
private final static TestFsVolumeSpi TEST_VOLUME = new TestFsVolumeSpi();

View File

@ -67,7 +67,7 @@ public class TestSimulatedFSDataset {
// we pass expected len as zero, - fsdataset should use the sizeof actual
// data written
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(
StorageType.DEFAULT, b);
StorageType.DEFAULT, b, false);
ReplicaOutputStreams out = bInfo.createStreams(true,
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
try {

View File

@ -34,7 +34,7 @@ public class FsDatasetTestUtil {
public static File getBlockFile(FsDatasetSpi<?> fsd, String bpid, Block b
) throws IOException {
return ((FsDatasetImpl)fsd).getBlockFile(bpid, b);
return ((FsDatasetImpl)fsd).getBlockFile(bpid, b.getBlockId());
}
public static File getMetaFile(FsDatasetSpi<?> fsd, String bpid, Block b)

View File

@ -358,7 +358,7 @@ public class TestWriteToReplica {
}
try {
dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED]);
dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED], false);
Assert.fail("Should not have created a replica that's already " +
"finalized " + blocks[FINALIZED]);
} catch (ReplicaAlreadyExistsException e) {
@ -376,7 +376,7 @@ public class TestWriteToReplica {
}
try {
dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY]);
dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY], false);
Assert.fail("Should not have created a replica that had created as " +
"temporary " + blocks[TEMPORARY]);
} catch (ReplicaAlreadyExistsException e) {
@ -386,7 +386,7 @@ public class TestWriteToReplica {
0L, blocks[RBW].getNumBytes()); // expect to be successful
try {
dataSet.createRbw(StorageType.DEFAULT, blocks[RBW]);
dataSet.createRbw(StorageType.DEFAULT, blocks[RBW], false);
Assert.fail("Should not have created a replica that had created as RBW " +
blocks[RBW]);
} catch (ReplicaAlreadyExistsException e) {
@ -402,7 +402,7 @@ public class TestWriteToReplica {
}
try {
dataSet.createRbw(StorageType.DEFAULT, blocks[RWR]);
dataSet.createRbw(StorageType.DEFAULT, blocks[RWR], false);
Assert.fail("Should not have created a replica that was waiting to be " +
"recovered " + blocks[RWR]);
} catch (ReplicaAlreadyExistsException e) {
@ -418,7 +418,7 @@ public class TestWriteToReplica {
}
try {
dataSet.createRbw(StorageType.DEFAULT, blocks[RUR]);
dataSet.createRbw(StorageType.DEFAULT, blocks[RUR], false);
Assert.fail("Should not have created a replica that was under recovery " +
blocks[RUR]);
} catch (ReplicaAlreadyExistsException e) {
@ -435,7 +435,7 @@ public class TestWriteToReplica {
e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
}
dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT]);
dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
}
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {