HDFS-6925. DataNode should attempt to place replicas on transient storage first if lazyPersist flag is received. (Arpit Agarwal)
Conflicts: hadoop-hdfs-project/hadoop-hdfs/CHANGES-HDFS-6581.txt hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/FsVolumeSpi.java hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/fsdataset/impl/FsDatasetImpl.java hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDirectoryScanner.java
This commit is contained in:
parent
8c3c0ec977
commit
34d0088bf9
|
@ -139,7 +139,8 @@ class BlockReceiver implements Closeable {
|
|||
final long newGs, final long minBytesRcvd, final long maxBytesRcvd,
|
||||
final String clientname, final DatanodeInfo srcDataNode,
|
||||
final DataNode datanode, DataChecksum requestedChecksum,
|
||||
CachingStrategy cachingStrategy) throws IOException {
|
||||
CachingStrategy cachingStrategy,
|
||||
final boolean allowLazyPersist) throws IOException {
|
||||
try{
|
||||
this.block = block;
|
||||
this.in = in;
|
||||
|
@ -180,7 +181,7 @@ class BlockReceiver implements Closeable {
|
|||
} else {
|
||||
switch (stage) {
|
||||
case PIPELINE_SETUP_CREATE:
|
||||
replicaInfo = datanode.data.createRbw(storageType, block);
|
||||
replicaInfo = datanode.data.createRbw(storageType, block, allowLazyPersist);
|
||||
datanode.notifyNamenodeReceivingBlock(
|
||||
block, replicaInfo.getStorageUuid());
|
||||
break;
|
||||
|
|
|
@ -607,8 +607,8 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
peer.getLocalAddressString(),
|
||||
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
|
||||
clientname, srcDataNode, datanode, requestedChecksum,
|
||||
cachingStrategy);
|
||||
|
||||
cachingStrategy, allowLazyPersist);
|
||||
|
||||
storageUuid = blockReceiver.getStorageUuid();
|
||||
} else {
|
||||
storageUuid = datanode.data.recoverClose(
|
||||
|
@ -1048,7 +1048,7 @@ class DataXceiver extends Receiver implements Runnable {
|
|||
proxyReply, proxySock.getRemoteSocketAddress().toString(),
|
||||
proxySock.getLocalSocketAddress().toString(),
|
||||
null, 0, 0, 0, "", null, datanode, remoteChecksum,
|
||||
CachingStrategy.newDropBehind());
|
||||
CachingStrategy.newDropBehind(), false);
|
||||
|
||||
// receive a block
|
||||
blockReceiver.receiveBlock(null, null, replyOut, null,
|
||||
|
|
|
@ -399,7 +399,7 @@ public class DirectoryScanner implements Runnable {
|
|||
/**
|
||||
* Reconcile differences between disk and in-memory blocks
|
||||
*/
|
||||
void reconcile() {
|
||||
void reconcile() throws IOException {
|
||||
scan();
|
||||
for (Entry<String, LinkedList<ScanInfo>> entry : diffs.entrySet()) {
|
||||
String bpid = entry.getKey();
|
||||
|
|
|
@ -59,4 +59,9 @@ public interface Replica {
|
|||
* Return the storageUuid of the volume that stores this replica.
|
||||
*/
|
||||
public String getStorageUuid();
|
||||
|
||||
/**
|
||||
* Return true if the target volume is backed by RAM.
|
||||
*/
|
||||
public boolean isOnTransientStorage();
|
||||
}
|
||||
|
|
|
@ -61,17 +61,6 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
|||
|
||||
private static final Map<String, File> internedBaseDirs = new HashMap<String, File>();
|
||||
|
||||
/**
|
||||
* Constructor for a zero length replica
|
||||
* @param blockId block id
|
||||
* @param genStamp replica generation stamp
|
||||
* @param vol volume where replica is located
|
||||
* @param dir directory path where block and meta files are located
|
||||
*/
|
||||
ReplicaInfo(long blockId, long genStamp, FsVolumeSpi vol, File dir) {
|
||||
this( blockId, 0L, genStamp, vol, dir);
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
* @param block a block
|
||||
|
@ -296,20 +285,6 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
|||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set this replica's generation stamp to be a newer one
|
||||
* @param newGS new generation stamp
|
||||
* @throws IOException is the new generation stamp is not greater than the current one
|
||||
*/
|
||||
void setNewerGenerationStamp(long newGS) throws IOException {
|
||||
long curGS = getGenerationStamp();
|
||||
if (newGS <= curGS) {
|
||||
throw new IOException("New generation stamp (" + newGS
|
||||
+ ") must be greater than current one (" + curGS + ")");
|
||||
}
|
||||
setGenerationStamp(newGS);
|
||||
}
|
||||
|
||||
@Override //Object
|
||||
public String toString() {
|
||||
return getClass().getSimpleName()
|
||||
|
@ -321,4 +296,9 @@ abstract public class ReplicaInfo extends Block implements Replica {
|
|||
+ "\n getVolume() = " + getVolume()
|
||||
+ "\n getBlockFile() = " + getBlockFile();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOnTransientStorage() {
|
||||
return volume.isTransientStorage();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -37,8 +37,7 @@ public class ReplicaUnderRecovery extends ReplicaInfo {
|
|||
// that the replica will be bumped to after recovery
|
||||
|
||||
public ReplicaUnderRecovery(ReplicaInfo replica, long recoveryId) {
|
||||
super(replica.getBlockId(), replica.getNumBytes(), replica.getGenerationStamp(),
|
||||
replica.getVolume(), replica.getDir());
|
||||
super(replica, replica.getVolume(), replica.getDir());
|
||||
if ( replica.getState() != ReplicaState.FINALIZED &&
|
||||
replica.getState() != ReplicaState.RBW &&
|
||||
replica.getState() != ReplicaState.RWR ) {
|
||||
|
|
|
@ -99,7 +99,7 @@ public class AvailableSpaceVolumeChoosingPolicy<V extends FsVolumeSpi>
|
|||
|
||||
@Override
|
||||
public synchronized V chooseVolume(List<V> volumes,
|
||||
final long replicaSize) throws IOException {
|
||||
long replicaSize) throws IOException {
|
||||
if (volumes.size() < 1) {
|
||||
throw new DiskOutOfSpaceException("No more available volumes");
|
||||
}
|
||||
|
|
|
@ -122,7 +122,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
* as corrupted.
|
||||
*/
|
||||
public void checkAndUpdate(String bpid, long blockId, File diskFile,
|
||||
File diskMetaFile, FsVolumeSpi vol);
|
||||
File diskMetaFile, FsVolumeSpi vol) throws IOException;
|
||||
|
||||
/**
|
||||
* @param b - the block
|
||||
|
@ -197,7 +197,7 @@ public interface FsDatasetSpi<V extends FsVolumeSpi> extends FSDatasetMBean {
|
|||
* @throws IOException if an error occurs
|
||||
*/
|
||||
public ReplicaInPipelineInterface createRbw(StorageType storageType,
|
||||
ExtendedBlock b) throws IOException;
|
||||
ExtendedBlock b, boolean allowLazyPersist) throws IOException;
|
||||
|
||||
/**
|
||||
* Recovers a RBW replica and returns the meta info of the replica
|
||||
|
|
|
@ -56,4 +56,7 @@ public interface FsVolumeSpi {
|
|||
* Release disk space previously reserved for RBW block.
|
||||
*/
|
||||
public void releaseReservedSpace(long bytesToRelease);
|
||||
}
|
||||
|
||||
/** Returns true if the volume is NOT backed by persistent storage. */
|
||||
public boolean isTransientStorage();
|
||||
}
|
||||
|
|
|
@ -20,6 +20,9 @@ package org.apache.hadoop.hdfs.server.datanode.fsdataset;
|
|||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
||||
|
||||
/**
|
||||
|
@ -27,12 +30,14 @@ import org.apache.hadoop.util.DiskChecker.DiskOutOfSpaceException;
|
|||
*/
|
||||
public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
|
||||
implements VolumeChoosingPolicy<V> {
|
||||
public static final Log LOG = LogFactory.getLog(RoundRobinVolumeChoosingPolicy.class);
|
||||
|
||||
private int curVolume = 0;
|
||||
|
||||
@Override
|
||||
public synchronized V chooseVolume(final List<V> volumes, final long blockSize
|
||||
) throws IOException {
|
||||
public synchronized V chooseVolume(final List<V> volumes, long blockSize)
|
||||
throws IOException {
|
||||
|
||||
if(volumes.size() < 1) {
|
||||
throw new DiskOutOfSpaceException("No more available volumes");
|
||||
}
|
||||
|
@ -50,7 +55,9 @@ public class RoundRobinVolumeChoosingPolicy<V extends FsVolumeSpi>
|
|||
final V volume = volumes.get(curVolume);
|
||||
curVolume = (curVolume + 1) % volumes.size();
|
||||
long availableVolumeSize = volume.getAvailable();
|
||||
if (availableVolumeSize > blockSize) { return volume; }
|
||||
if (availableVolumeSize > blockSize) {
|
||||
return volume;
|
||||
}
|
||||
|
||||
if (availableVolumeSize > maxAvailable) {
|
||||
maxAvailable = availableVolumeSize;
|
||||
|
|
|
@ -45,6 +45,9 @@ import javax.management.ObjectName;
|
|||
import javax.management.StandardMBean;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.TreeMultimap;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
|
@ -278,7 +281,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
// If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
|
||||
// nothing needed to be rolled back to make various data structures, e.g.,
|
||||
// storageMap and asyncDiskService, consistent.
|
||||
FsVolumeImpl fsVolume = new FsVolumeImpl(
|
||||
FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume(
|
||||
this, sd.getStorageUuid(), dir, this.conf, storageType);
|
||||
ReplicaMap tempVolumeMap = new ReplicaMap(this);
|
||||
fsVolume.getVolumeMap(tempVolumeMap);
|
||||
|
@ -550,16 +553,16 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
* Get File name for a given block.
|
||||
*/
|
||||
private File getBlockFile(ExtendedBlock b) throws IOException {
|
||||
return getBlockFile(b.getBlockPoolId(), b.getLocalBlock());
|
||||
return getBlockFile(b.getBlockPoolId(), b.getBlockId());
|
||||
}
|
||||
|
||||
/**
|
||||
* Get File name for a given block.
|
||||
*/
|
||||
File getBlockFile(String bpid, Block b) throws IOException {
|
||||
File f = validateBlockFile(bpid, b);
|
||||
File getBlockFile(String bpid, long blockId) throws IOException {
|
||||
File f = validateBlockFile(bpid, blockId);
|
||||
if(f == null) {
|
||||
throw new IOException("Block " + b + " is not valid.");
|
||||
throw new IOException("BlockId " + blockId + " is not valid.");
|
||||
}
|
||||
return f;
|
||||
}
|
||||
|
@ -949,8 +952,8 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaInPipeline createRbw(StorageType storageType,
|
||||
ExtendedBlock b) throws IOException {
|
||||
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
|
||||
ExtendedBlock b, boolean allowLazyPersist) throws IOException {
|
||||
ReplicaInfo replicaInfo = volumeMap.get(b.getBlockPoolId(),
|
||||
b.getBlockId());
|
||||
if (replicaInfo != null) {
|
||||
throw new ReplicaAlreadyExistsException("Block " + b +
|
||||
|
@ -958,8 +961,25 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
" and thus cannot be created.");
|
||||
}
|
||||
// create a new block
|
||||
FsVolumeImpl v = volumes.getNextVolume(storageType, b.getNumBytes());
|
||||
// create a rbw file to hold block in the designated volume
|
||||
FsVolumeImpl v;
|
||||
while (true) {
|
||||
try {
|
||||
if (allowLazyPersist) {
|
||||
// First try to place the block on a transient volume.
|
||||
v = volumes.getNextTransientVolume(b.getNumBytes());
|
||||
} else {
|
||||
v = volumes.getNextVolume(storageType, b.getNumBytes());
|
||||
}
|
||||
} catch (DiskOutOfSpaceException de) {
|
||||
if (allowLazyPersist) {
|
||||
allowLazyPersist = false;
|
||||
continue;
|
||||
}
|
||||
throw de;
|
||||
}
|
||||
break;
|
||||
}
|
||||
// create an rbw file to hold block in the designated volume
|
||||
File f = v.createRbwFile(b.getBlockPoolId(), b.getLocalBlock());
|
||||
ReplicaBeingWritten newReplicaInfo = new ReplicaBeingWritten(b.getBlockId(),
|
||||
b.getGenerationStamp(), v, f.getParentFile(), b.getNumBytes());
|
||||
|
@ -1321,11 +1341,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
/**
|
||||
* Find the file corresponding to the block and return it if it exists.
|
||||
*/
|
||||
File validateBlockFile(String bpid, Block b) {
|
||||
File validateBlockFile(String bpid, long blockId) {
|
||||
//Should we check for metadata file too?
|
||||
final File f;
|
||||
synchronized(this) {
|
||||
f = getFile(bpid, b.getBlockId());
|
||||
f = getFile(bpid, blockId);
|
||||
}
|
||||
|
||||
if(f != null ) {
|
||||
|
@ -1337,7 +1357,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
}
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("b=" + b + ", f=" + f);
|
||||
LOG.debug("blockId=" + blockId + ", f=" + f);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
@ -1497,6 +1517,11 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
": volume was not an instance of FsVolumeImpl.");
|
||||
return;
|
||||
}
|
||||
if (volume.isTransientStorage()) {
|
||||
LOG.warn("Caching not supported on block with id " + blockId +
|
||||
" since the volume is backed by RAM.");
|
||||
return;
|
||||
}
|
||||
success = true;
|
||||
} finally {
|
||||
if (!success) {
|
||||
|
|
|
@ -0,0 +1,60 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
|
||||
/**
|
||||
* Volume for storing replicas in memory. These can be deleted at any time
|
||||
* to make space for new replicas and there is no persistence guarantee.
|
||||
*
|
||||
* The backing store for these replicas is expected to be RAM_DISK.
|
||||
* The backing store may be disk when testing.
|
||||
*
|
||||
* It uses the {@link FsDatasetImpl} object for synchronization.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
public class FsTransientVolumeImpl extends FsVolumeImpl {
|
||||
|
||||
|
||||
FsTransientVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
|
||||
Configuration conf, StorageType storageType)
|
||||
throws IOException {
|
||||
super(dataset, storageID, currentDir, conf, storageType);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
|
||||
// Can't 'cache' replicas already in RAM.
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTransientStorage() {
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -77,7 +77,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
* dfs.datanode.fsdatasetcache.max.threads.per.volume) to limit resource
|
||||
* contention.
|
||||
*/
|
||||
private final ThreadPoolExecutor cacheExecutor;
|
||||
protected ThreadPoolExecutor cacheExecutor;
|
||||
|
||||
FsVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
|
||||
Configuration conf, StorageType storageType) throws IOException {
|
||||
|
@ -201,6 +201,11 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
return currentDir.getParent();
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTransientStorage() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String getPath(String bpid) throws IOException {
|
||||
return getBlockPoolSlice(bpid).getDirectory().getAbsolutePath();
|
||||
|
@ -324,7 +329,9 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
}
|
||||
|
||||
void shutdown() {
|
||||
cacheExecutor.shutdown();
|
||||
if (cacheExecutor != null) {
|
||||
cacheExecutor.shutdown();
|
||||
}
|
||||
Set<Entry<String, BlockPoolSlice>> set = bpSlices.entrySet();
|
||||
for (Entry<String, BlockPoolSlice> entry : set) {
|
||||
entry.getValue().shutdown();
|
||||
|
@ -417,6 +424,5 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
DatanodeStorage toDatanodeStorage() {
|
||||
return new DatanodeStorage(storageID, DatanodeStorage.State.NORMAL, storageType);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
|
||||
/**
|
||||
* Generate volumes based on the storageType.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class FsVolumeImplAllocator {
|
||||
static FsVolumeImpl createVolume(FsDatasetImpl fsDataset, String storageUuid,
|
||||
File dir, Configuration conf, StorageType storageType)
|
||||
throws IOException {
|
||||
switch(storageType) {
|
||||
case RAM_DISK:
|
||||
return new FsTransientVolumeImpl(
|
||||
fsDataset, storageUuid, dir, conf, storageType);
|
||||
default:
|
||||
return new FsVolumeImpl(
|
||||
fsDataset, storageUuid, dir, conf, storageType);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -68,7 +68,25 @@ class FsVolumeList {
|
|||
}
|
||||
return blockChooser.chooseVolume(list, blockSize);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Get next volume. Synchronized to ensure {@link #curVolume} is updated
|
||||
* by a single thread and next volume is chosen with no concurrent
|
||||
* update to {@link #volumes}.
|
||||
* @param blockSize free space needed on the volume
|
||||
* @return next volume to store the block in.
|
||||
*/
|
||||
synchronized FsVolumeImpl getNextTransientVolume(
|
||||
long blockSize) throws IOException {
|
||||
final List<FsVolumeImpl> list = new ArrayList<FsVolumeImpl>(volumes.size());
|
||||
for(FsVolumeImpl v : volumes) {
|
||||
if (v.isTransientStorage()) {
|
||||
list.add(v);
|
||||
}
|
||||
}
|
||||
return blockChooser.chooseVolume(list, blockSize);
|
||||
}
|
||||
|
||||
long getDfsUsed() throws IOException {
|
||||
long dfsUsed = 0L;
|
||||
for (FsVolumeImpl v : volumes) {
|
||||
|
|
|
@ -98,9 +98,10 @@ public class TestWriteBlockGetsBlockLengthHint {
|
|||
*/
|
||||
@Override
|
||||
public synchronized ReplicaInPipelineInterface createRbw(
|
||||
StorageType storageType, ExtendedBlock b) throws IOException {
|
||||
StorageType storageType, ExtendedBlock b, boolean allowLazyPersist)
|
||||
throws IOException {
|
||||
assertThat(b.getLocalBlock().getNumBytes(), is(EXPECTED_BLOCK_LENGTH));
|
||||
return super.createRbw(storageType, b);
|
||||
return super.createRbw(storageType, b, allowLazyPersist);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -364,7 +364,7 @@ public abstract class BlockReportTestBase {
|
|||
// Create a bogus new block which will not be present on the namenode.
|
||||
ExtendedBlock b = new ExtendedBlock(
|
||||
poolId, rand.nextLong(), 1024L, rand.nextLong());
|
||||
dn.getFSDataset().createRbw(StorageType.DEFAULT, b);
|
||||
dn.getFSDataset().createRbw(StorageType.DEFAULT, b, false);
|
||||
|
||||
DatanodeRegistration dnR = dn.getDNRegistrationForBP(poolId);
|
||||
StorageBlockReport[] reports = getBlockReports(dn, poolId, false, false);
|
||||
|
|
|
@ -300,6 +300,11 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
public ChunkChecksum getLastChecksumAndDataLen() {
|
||||
return new ChunkChecksum(oStream.getLength(), null);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOnTransientStorage() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -747,7 +752,8 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
|
||||
@Override // FsDatasetSpi
|
||||
public synchronized ReplicaInPipelineInterface createRbw(
|
||||
StorageType storageType, ExtendedBlock b) throws IOException {
|
||||
StorageType storageType, ExtendedBlock b,
|
||||
boolean allowLazyPersist) throws IOException {
|
||||
return createTemporary(storageType, b);
|
||||
}
|
||||
|
||||
|
@ -1083,7 +1089,7 @@ public class SimulatedFSDataset implements FsDatasetSpi<FsVolumeSpi> {
|
|||
|
||||
@Override
|
||||
public void checkAndUpdate(String bpid, long blockId, File diskFile,
|
||||
File diskMetaFile, FsVolumeSpi vol) {
|
||||
File diskMetaFile, FsVolumeSpi vol) throws IOException {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
|
||||
|
|
|
@ -529,7 +529,7 @@ public class TestBlockRecovery {
|
|||
if(LOG.isDebugEnabled()) {
|
||||
LOG.debug("Running " + GenericTestUtils.getMethodName());
|
||||
}
|
||||
dn.data.createRbw(StorageType.DEFAULT, block);
|
||||
dn.data.createRbw(StorageType.DEFAULT, block, false);
|
||||
try {
|
||||
dn.syncBlock(rBlock, initBlockRecords(dn));
|
||||
fail("Sync should fail");
|
||||
|
@ -553,7 +553,7 @@ public class TestBlockRecovery {
|
|||
LOG.debug("Running " + GenericTestUtils.getMethodName());
|
||||
}
|
||||
ReplicaInPipelineInterface replicaInfo = dn.data.createRbw(
|
||||
StorageType.DEFAULT, block);
|
||||
StorageType.DEFAULT, block, false);
|
||||
ReplicaOutputStreams streams = null;
|
||||
try {
|
||||
streams = replicaInfo.createStreams(true,
|
||||
|
|
|
@ -215,7 +215,7 @@ public class TestDirectoryScanner {
|
|||
}
|
||||
|
||||
private void scan(long totalBlocks, int diffsize, long missingMetaFile, long missingBlockFile,
|
||||
long missingMemoryBlocks, long mismatchBlocks) {
|
||||
long missingMemoryBlocks, long mismatchBlocks) throws IOException {
|
||||
scanner.reconcile();
|
||||
|
||||
assertTrue(scanner.diffs.containsKey(bpid));
|
||||
|
@ -431,6 +431,10 @@ public class TestDirectoryScanner {
|
|||
|
||||
@Override
|
||||
public void releaseReservedSpace(long bytesToRelease) {
|
||||
|
||||
@Override
|
||||
public boolean isTransientStorage() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -67,7 +67,7 @@ public class TestSimulatedFSDataset {
|
|||
// we pass expected len as zero, - fsdataset should use the sizeof actual
|
||||
// data written
|
||||
ReplicaInPipelineInterface bInfo = fsdataset.createRbw(
|
||||
StorageType.DEFAULT, b);
|
||||
StorageType.DEFAULT, b, false);
|
||||
ReplicaOutputStreams out = bInfo.createStreams(true,
|
||||
DataChecksum.newDataChecksum(DataChecksum.Type.CRC32, 512));
|
||||
try {
|
||||
|
|
|
@ -34,7 +34,7 @@ public class FsDatasetTestUtil {
|
|||
|
||||
public static File getBlockFile(FsDatasetSpi<?> fsd, String bpid, Block b
|
||||
) throws IOException {
|
||||
return ((FsDatasetImpl)fsd).getBlockFile(bpid, b);
|
||||
return ((FsDatasetImpl)fsd).getBlockFile(bpid, b.getBlockId());
|
||||
}
|
||||
|
||||
public static File getMetaFile(FsDatasetSpi<?> fsd, String bpid, Block b)
|
||||
|
|
|
@ -358,7 +358,7 @@ public class TestWriteToReplica {
|
|||
}
|
||||
|
||||
try {
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[FINALIZED], false);
|
||||
Assert.fail("Should not have created a replica that's already " +
|
||||
"finalized " + blocks[FINALIZED]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
|
@ -376,7 +376,7 @@ public class TestWriteToReplica {
|
|||
}
|
||||
|
||||
try {
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[TEMPORARY], false);
|
||||
Assert.fail("Should not have created a replica that had created as " +
|
||||
"temporary " + blocks[TEMPORARY]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
|
@ -386,7 +386,7 @@ public class TestWriteToReplica {
|
|||
0L, blocks[RBW].getNumBytes()); // expect to be successful
|
||||
|
||||
try {
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[RBW]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[RBW], false);
|
||||
Assert.fail("Should not have created a replica that had created as RBW " +
|
||||
blocks[RBW]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
|
@ -402,7 +402,7 @@ public class TestWriteToReplica {
|
|||
}
|
||||
|
||||
try {
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[RWR]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[RWR], false);
|
||||
Assert.fail("Should not have created a replica that was waiting to be " +
|
||||
"recovered " + blocks[RWR]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
|
@ -418,7 +418,7 @@ public class TestWriteToReplica {
|
|||
}
|
||||
|
||||
try {
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[RUR]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[RUR], false);
|
||||
Assert.fail("Should not have created a replica that was under recovery " +
|
||||
blocks[RUR]);
|
||||
} catch (ReplicaAlreadyExistsException e) {
|
||||
|
@ -435,7 +435,7 @@ public class TestWriteToReplica {
|
|||
e.getMessage().contains(ReplicaNotFoundException.NON_EXISTENT_REPLICA));
|
||||
}
|
||||
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT]);
|
||||
dataSet.createRbw(StorageType.DEFAULT, blocks[NON_EXISTENT], false);
|
||||
}
|
||||
|
||||
private void testWriteToTemporary(FsDatasetImpl dataSet, ExtendedBlock[] blocks) throws IOException {
|
||||
|
|
Loading…
Reference in New Issue