HDFS-7159. Use block storage policy to set lazy persist preference. (Arpit Agarwal)
This commit is contained in:
parent
368e2423a5
commit
bb84f1fccb
|
@ -38,7 +38,6 @@ public class FileStatus implements Writable, Comparable {
|
|||
private boolean isdir;
|
||||
private short block_replication;
|
||||
private long blocksize;
|
||||
private boolean isLazyPersist;
|
||||
private long modification_time;
|
||||
private long access_time;
|
||||
private FsPermission permission;
|
||||
|
@ -74,18 +73,6 @@ public class FileStatus implements Writable, Comparable {
|
|||
FsPermission permission, String owner, String group,
|
||||
Path symlink,
|
||||
Path path) {
|
||||
this(length, isdir, block_replication, blocksize, false,
|
||||
modification_time, access_time, permission, owner, group,
|
||||
symlink, path);
|
||||
}
|
||||
|
||||
public FileStatus(long length, boolean isdir,
|
||||
int block_replication,
|
||||
long blocksize, boolean isLazyPersist,
|
||||
long modification_time, long access_time,
|
||||
FsPermission permission, String owner, String group,
|
||||
Path symlink,
|
||||
Path path) {
|
||||
this.length = length;
|
||||
this.isdir = isdir;
|
||||
this.block_replication = (short)block_replication;
|
||||
|
@ -105,7 +92,6 @@ public class FileStatus implements Writable, Comparable {
|
|||
this.group = (group == null) ? "" : group;
|
||||
this.symlink = symlink;
|
||||
this.path = path;
|
||||
this.isLazyPersist = isLazyPersist;
|
||||
// The variables isdir and symlink indicate the type:
|
||||
// 1. isdir implies directory, in which case symlink must be null.
|
||||
// 2. !isdir implies a file or symlink, symlink != null implies a
|
||||
|
@ -181,13 +167,6 @@ public class FileStatus implements Writable, Comparable {
|
|||
return blocksize;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get whether the file is lazyPersist.
|
||||
*/
|
||||
public boolean isLazyPersist() {
|
||||
return isLazyPersist;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the replication factor of a file.
|
||||
* @return the replication factor of a file.
|
||||
|
|
|
@ -762,7 +762,6 @@ public class RawLocalFileSystem extends FileSystem {
|
|||
false,
|
||||
fs.getReplication(),
|
||||
fs.getBlockSize(),
|
||||
fs.isLazyPersist(),
|
||||
fs.getModificationTime(),
|
||||
fs.getAccessTime(),
|
||||
fs.getPermission(),
|
||||
|
@ -778,7 +777,7 @@ public class RawLocalFileSystem extends FileSystem {
|
|||
* when available.
|
||||
*/
|
||||
if (!target.isEmpty()) {
|
||||
return new FileStatus(0, false, 0, 0, false, 0, 0, FsPermission.getDefault(),
|
||||
return new FileStatus(0, false, 0, 0, 0, 0, FsPermission.getDefault(),
|
||||
"", "", new Path(target), f);
|
||||
}
|
||||
// f refers to a file or directory that does not exist
|
||||
|
|
|
@ -39,7 +39,6 @@ import org.apache.hadoop.fs.FileStatus;
|
|||
* %u: User name of owner
|
||||
* %y: UTC date as "yyyy-MM-dd HH:mm:ss"
|
||||
* %Y: Milliseconds since January 1, 1970 UTC
|
||||
* %l: Whether lazyPersist flag is set on the file.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@InterfaceStability.Unstable
|
||||
|
@ -54,8 +53,7 @@ class Stat extends FsCommand {
|
|||
public static final String DESCRIPTION =
|
||||
"Print statistics about the file/directory at <path> " +
|
||||
"in the specified format. Format accepts filesize in blocks (%b), group name of owner(%g), " +
|
||||
"filename (%n), block size (%o), replication (%r), user name of owner(%u), modification date (%y, %Y), " +
|
||||
"lazyPersist flag (%l)\n";
|
||||
"filename (%n), block size (%o), replication (%r), user name of owner(%u), modification date (%y, %Y)\n";
|
||||
|
||||
protected static final SimpleDateFormat timeFmt;
|
||||
static {
|
||||
|
@ -117,9 +115,6 @@ class Stat extends FsCommand {
|
|||
case 'Y':
|
||||
buf.append(stat.getModificationTime());
|
||||
break;
|
||||
case 'l':
|
||||
buf.append(stat.isLazyPersist());
|
||||
break;
|
||||
default:
|
||||
// this leaves %<unknown> alone, which causes the potential for
|
||||
// future format options to break strings; should use %% to
|
||||
|
|
|
@ -798,7 +798,7 @@
|
|||
</comparator>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^( |\t)*\(%y, %Y\), lazyPersist flag \(\%l\)( )*</expected-output>
|
||||
<expected-output>^( |\t)*\(%y, %Y\)( )*</expected-output>
|
||||
</comparator>
|
||||
</comparators>
|
||||
</test>
|
||||
|
|
|
@ -159,7 +159,6 @@ public class HttpFSFileSystem extends FileSystem
|
|||
public static final String XATTR_NAME_JSON = "name";
|
||||
public static final String XATTR_VALUE_JSON = "value";
|
||||
public static final String XATTRNAMES_JSON = "XAttrNames";
|
||||
public static final String LAZY_PERSIST_JSON = "LazyPersist";
|
||||
|
||||
public static final String FILE_CHECKSUM_JSON = "FileChecksum";
|
||||
public static final String CHECKSUM_ALGORITHM_JSON = "algorithm";
|
||||
|
@ -947,20 +946,19 @@ public class HttpFSFileSystem extends FileSystem
|
|||
long mTime = (Long) json.get(MODIFICATION_TIME_JSON);
|
||||
long blockSize = (Long) json.get(BLOCK_SIZE_JSON);
|
||||
short replication = ((Long) json.get(REPLICATION_JSON)).shortValue();
|
||||
boolean isLazyPersist = ((Boolean) json.get(LAZY_PERSIST_JSON)).booleanValue();
|
||||
FileStatus fileStatus = null;
|
||||
|
||||
switch (type) {
|
||||
case FILE:
|
||||
case DIRECTORY:
|
||||
fileStatus = new FileStatus(len, (type == FILE_TYPE.DIRECTORY),
|
||||
replication, blockSize, false, mTime, aTime,
|
||||
permission, owner, group, null, path);
|
||||
replication, blockSize, mTime, aTime,
|
||||
permission, owner, group, path);
|
||||
break;
|
||||
case SYMLINK:
|
||||
Path symLink = null;
|
||||
fileStatus = new FileStatus(len, false,
|
||||
replication, blockSize, isLazyPersist, mTime, aTime,
|
||||
replication, blockSize, mTime, aTime,
|
||||
permission, owner, group, symLink,
|
||||
path);
|
||||
}
|
||||
|
|
|
@ -125,7 +125,6 @@ public class FSOperations {
|
|||
fileStatus.getModificationTime());
|
||||
json.put(HttpFSFileSystem.BLOCK_SIZE_JSON, fileStatus.getBlockSize());
|
||||
json.put(HttpFSFileSystem.REPLICATION_JSON, fileStatus.getReplication());
|
||||
json.put(HttpFSFileSystem.LAZY_PERSIST_JSON, fileStatus.isLazyPersist());
|
||||
if ( (aclStatus != null) && !(aclStatus.getEntries().isEmpty()) ) {
|
||||
json.put(HttpFSFileSystem.ACL_BIT_JSON,true);
|
||||
}
|
||||
|
|
|
@ -86,3 +86,7 @@
|
|||
HDFS-7153. Add storagePolicy to NN edit log during file creation.
|
||||
(Arpit Agarwal)
|
||||
|
||||
HDFS-7159. Use block storage policy to set lazy persist preference.
|
||||
(Arpit Agarwal)
|
||||
|
||||
|
||||
|
|
|
@ -54,6 +54,7 @@ import org.apache.hadoop.fs.permission.FsPermission;
|
|||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream;
|
||||
import org.apache.hadoop.hdfs.client.HdfsDataOutputStream.SyncFlag;
|
||||
import org.apache.hadoop.crypto.CryptoProtocolVersion;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.DSQuotaExceededException;
|
||||
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
|
||||
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
|
||||
|
@ -75,6 +76,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status;
|
|||
import org.apache.hadoop.hdfs.protocolPB.PBHelper;
|
||||
import org.apache.hadoop.hdfs.security.token.block.BlockTokenIdentifier;
|
||||
import org.apache.hadoop.hdfs.security.token.block.InvalidBlockTokenException;
|
||||
import org.apache.hadoop.hdfs.server.blockmanagement.BlockStoragePolicySuite;
|
||||
import org.apache.hadoop.hdfs.server.datanode.CachingStrategy;
|
||||
import org.apache.hadoop.hdfs.server.namenode.NotReplicatedYetException;
|
||||
import org.apache.hadoop.hdfs.server.namenode.RetryStartFileException;
|
||||
|
@ -172,6 +174,8 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
private final AtomicReference<CachingStrategy> cachingStrategy;
|
||||
private boolean failPacket = false;
|
||||
private FileEncryptionInfo fileEncryptionInfo;
|
||||
private static final BlockStoragePolicySuite blockStoragePolicySuite =
|
||||
BlockStoragePolicySuite.createDefaultSuite();
|
||||
|
||||
private static class Packet {
|
||||
private static final long HEART_BEAT_SEQNO = -1L;
|
||||
|
@ -386,7 +390,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
*/
|
||||
private DataStreamer(HdfsFileStatus stat, Span span) {
|
||||
isAppend = false;
|
||||
isLazyPersistFile = stat.isLazyPersist();
|
||||
isLazyPersistFile = initLazyPersist(stat);
|
||||
stage = BlockConstructionStage.PIPELINE_SETUP_CREATE;
|
||||
traceSpan = span;
|
||||
}
|
||||
|
@ -406,7 +410,7 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
block = lastBlock.getBlock();
|
||||
bytesSent = block.getNumBytes();
|
||||
accessToken = lastBlock.getBlockToken();
|
||||
isLazyPersistFile = stat.isLazyPersist();
|
||||
isLazyPersistFile = initLazyPersist(stat);
|
||||
long usedInLastBlock = stat.getLen() % blockSize;
|
||||
int freeInLastBlock = (int)(blockSize - usedInLastBlock);
|
||||
|
||||
|
@ -450,6 +454,13 @@ public class DFSOutputStream extends FSOutputSummer
|
|||
}
|
||||
}
|
||||
|
||||
private boolean initLazyPersist(HdfsFileStatus stat) {
|
||||
final BlockStoragePolicy lpPolicy =
|
||||
blockStoragePolicySuite.getPolicy("LAZY_PERSIST");
|
||||
return lpPolicy != null &&
|
||||
stat.getStoragePolicy() == lpPolicy.getId();
|
||||
}
|
||||
|
||||
private void setPipeline(LocatedBlock lb) {
|
||||
setPipeline(lb.getLocations(), lb.getStorageTypes(), lb.getStorageIDs());
|
||||
}
|
||||
|
|
|
@ -47,7 +47,13 @@ public enum StorageType {
|
|||
|
||||
StorageType(boolean isTransient) { this.isTransient = isTransient; }
|
||||
|
||||
public boolean isMovable() { return isTransient == false; }
|
||||
public boolean isTransient() {
|
||||
return isTransient;
|
||||
}
|
||||
|
||||
public boolean isMovable() {
|
||||
return !isTransient;
|
||||
}
|
||||
|
||||
public static List<StorageType> asList() {
|
||||
return Arrays.asList(VALUES);
|
||||
|
|
|
@ -49,15 +49,29 @@ public class BlockStoragePolicy {
|
|||
private final StorageType[] creationFallbacks;
|
||||
/** The fallback storage type for replication. */
|
||||
private final StorageType[] replicationFallbacks;
|
||||
/**
|
||||
* Whether the policy is inherited during file creation.
|
||||
* If set then the policy cannot be changed after file creation.
|
||||
*/
|
||||
private boolean copyOnCreateFile;
|
||||
|
||||
@VisibleForTesting
|
||||
public BlockStoragePolicy(byte id, String name, StorageType[] storageTypes,
|
||||
StorageType[] creationFallbacks, StorageType[] replicationFallbacks) {
|
||||
this(id, name, storageTypes, creationFallbacks, replicationFallbacks,
|
||||
false);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public BlockStoragePolicy(byte id, String name, StorageType[] storageTypes,
|
||||
StorageType[] creationFallbacks, StorageType[] replicationFallbacks,
|
||||
boolean copyOnCreateFile) {
|
||||
this.id = id;
|
||||
this.name = name;
|
||||
this.storageTypes = storageTypes;
|
||||
this.creationFallbacks = creationFallbacks;
|
||||
this.replicationFallbacks = replicationFallbacks;
|
||||
this.copyOnCreateFile = copyOnCreateFile;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -65,14 +79,23 @@ public class BlockStoragePolicy {
|
|||
*/
|
||||
public List<StorageType> chooseStorageTypes(final short replication) {
|
||||
final List<StorageType> types = new LinkedList<StorageType>();
|
||||
int i = 0;
|
||||
for(; i < replication && i < storageTypes.length; i++) {
|
||||
types.add(storageTypes[i]);
|
||||
int i = 0, j = 0;
|
||||
|
||||
// Do not return transient storage types. We will not have accurate
|
||||
// usage information for transient types.
|
||||
for (;i < replication && j < storageTypes.length; ++j) {
|
||||
if (!storageTypes[j].isTransient()) {
|
||||
types.add(storageTypes[j]);
|
||||
++i;
|
||||
}
|
||||
}
|
||||
|
||||
final StorageType last = storageTypes[storageTypes.length - 1];
|
||||
if (!last.isTransient()) {
|
||||
for (; i < replication; i++) {
|
||||
types.add(last);
|
||||
}
|
||||
}
|
||||
return types;
|
||||
}
|
||||
|
||||
|
@ -241,4 +264,8 @@ public class BlockStoragePolicy {
|
|||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
public boolean isCopyOnCreateFile() {
|
||||
return copyOnCreateFile;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -39,7 +39,6 @@ public class HdfsFileStatus {
|
|||
private final boolean isdir;
|
||||
private final short block_replication;
|
||||
private final long blocksize;
|
||||
private final boolean isLazyPersist;
|
||||
private final long modification_time;
|
||||
private final long access_time;
|
||||
private final FsPermission permission;
|
||||
|
@ -71,15 +70,14 @@ public class HdfsFileStatus {
|
|||
* @param feInfo the file's encryption info
|
||||
*/
|
||||
public HdfsFileStatus(long length, boolean isdir, int block_replication,
|
||||
long blocksize, boolean isLazyPersist, long modification_time,
|
||||
long access_time, FsPermission permission, String owner,
|
||||
String group, byte[] symlink, byte[] path, long fileId,
|
||||
int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy) {
|
||||
long blocksize, long modification_time, long access_time,
|
||||
FsPermission permission, String owner, String group, byte[] symlink,
|
||||
byte[] path, long fileId, int childrenNum, FileEncryptionInfo feInfo,
|
||||
byte storagePolicy) {
|
||||
this.length = length;
|
||||
this.isdir = isdir;
|
||||
this.block_replication = (short)block_replication;
|
||||
this.blocksize = blocksize;
|
||||
this.isLazyPersist = isLazyPersist;
|
||||
this.modification_time = modification_time;
|
||||
this.access_time = access_time;
|
||||
this.permission = (permission == null) ?
|
||||
|
@ -129,13 +127,6 @@ public class HdfsFileStatus {
|
|||
return blocksize;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return true if the file is lazyPersist.
|
||||
*/
|
||||
final public boolean isLazyPersist() {
|
||||
return isLazyPersist;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the replication factor of a file.
|
||||
* @return the replication factor of a file.
|
||||
|
@ -270,7 +261,7 @@ public class HdfsFileStatus {
|
|||
|
||||
public final FileStatus makeQualified(URI defaultUri, Path path) {
|
||||
return new FileStatus(getLen(), isDir(), getReplication(),
|
||||
getBlockSize(), isLazyPersist(), getModificationTime(),
|
||||
getBlockSize(), getModificationTime(),
|
||||
getAccessTime(),
|
||||
getPermission(), getOwner(), getGroup(),
|
||||
isSymlink() ? new Path(getSymlink()) : null,
|
||||
|
|
|
@ -55,14 +55,13 @@ public class HdfsLocatedFileStatus extends HdfsFileStatus {
|
|||
* @param feInfo file encryption info
|
||||
*/
|
||||
public HdfsLocatedFileStatus(long length, boolean isdir,
|
||||
int block_replication, long blocksize, boolean isLazyPersist,
|
||||
long modification_time, long access_time, FsPermission permission,
|
||||
String owner, String group, byte[] symlink, byte[] path, long fileId,
|
||||
LocatedBlocks locations, int childrenNum, FileEncryptionInfo feInfo,
|
||||
byte storagePolicy) {
|
||||
super(length, isdir, block_replication, blocksize, isLazyPersist,
|
||||
modification_time, access_time, permission, owner, group, symlink,
|
||||
path, fileId, childrenNum, feInfo, storagePolicy);
|
||||
int block_replication, long blocksize, long modification_time,
|
||||
long access_time, FsPermission permission, String owner, String group,
|
||||
byte[] symlink, byte[] path, long fileId, LocatedBlocks locations,
|
||||
int childrenNum, FileEncryptionInfo feInfo, byte storagePolicy) {
|
||||
super(length, isdir, block_replication, blocksize, modification_time,
|
||||
access_time, permission, owner, group, symlink, path, fileId,
|
||||
childrenNum, feInfo, storagePolicy);
|
||||
this.locations = locations;
|
||||
}
|
||||
|
||||
|
|
|
@ -60,7 +60,7 @@ public class SnapshottableDirectoryStatus {
|
|||
FsPermission permission, String owner, String group, byte[] localName,
|
||||
long inodeId, int childrenNum,
|
||||
int snapshotNumber, int snapshotQuota, byte[] parentFullPath) {
|
||||
this.dirStatus = new HdfsFileStatus(0, true, 0, 0, false, modification_time,
|
||||
this.dirStatus = new HdfsFileStatus(0, true, 0, 0, modification_time,
|
||||
access_time, permission, owner, group, null, localName, inodeId,
|
||||
childrenNum, null, BlockStoragePolicySuite.ID_UNSPECIFIED);
|
||||
this.snapshotNumber = snapshotNumber;
|
||||
|
|
|
@ -1410,7 +1410,6 @@ public class PBHelper {
|
|||
return new HdfsLocatedFileStatus(
|
||||
fs.getLength(), fs.getFileType().equals(FileType.IS_DIR),
|
||||
fs.getBlockReplication(), fs.getBlocksize(),
|
||||
fs.hasIsLazyPersist() ? fs.getIsLazyPersist() : false,
|
||||
fs.getModificationTime(), fs.getAccessTime(),
|
||||
PBHelper.convert(fs.getPermission()), fs.getOwner(), fs.getGroup(),
|
||||
fs.getFileType().equals(FileType.IS_SYMLINK) ?
|
||||
|
@ -1460,7 +1459,6 @@ public class PBHelper {
|
|||
setFileType(fType).
|
||||
setBlockReplication(fs.getReplication()).
|
||||
setBlocksize(fs.getBlockSize()).
|
||||
setIsLazyPersist(fs.isLazyPersist()).
|
||||
setModificationTime(fs.getModificationTime()).
|
||||
setAccessTime(fs.getAccessTime()).
|
||||
setPermission(PBHelper.convert(fs.getPermission())).
|
||||
|
|
|
@ -54,12 +54,6 @@ public interface BlockCollection {
|
|||
*/
|
||||
public long getPreferredBlockSize();
|
||||
|
||||
/**
|
||||
* Return true if the file was created with {@Link CreateFlag#LAZY_PERSIST}.
|
||||
* @return
|
||||
*/
|
||||
public boolean getLazyPersistFlag();
|
||||
|
||||
/**
|
||||
* Get block replication for the collection
|
||||
* @return block replication value
|
||||
|
|
|
@ -402,6 +402,10 @@ public class BlockManager {
|
|||
return storagePolicySuite.getPolicy(policyName);
|
||||
}
|
||||
|
||||
public BlockStoragePolicy getStoragePolicy(final byte policyId) {
|
||||
return storagePolicySuite.getPolicy(policyId);
|
||||
}
|
||||
|
||||
public BlockStoragePolicy[] getStoragePolicies() {
|
||||
return storagePolicySuite.getAllPolicies();
|
||||
}
|
||||
|
|
|
@ -44,6 +44,12 @@ public class BlockStoragePolicySuite {
|
|||
public static BlockStoragePolicySuite createDefaultSuite() {
|
||||
final BlockStoragePolicy[] policies =
|
||||
new BlockStoragePolicy[1 << ID_BIT_LENGTH];
|
||||
final byte lazyPersistId = 15;
|
||||
policies[lazyPersistId] = new BlockStoragePolicy(lazyPersistId, "LAZY_PERSIST",
|
||||
new StorageType[]{StorageType.RAM_DISK, StorageType.DISK},
|
||||
new StorageType[]{StorageType.DISK},
|
||||
new StorageType[]{StorageType.DISK},
|
||||
true); // Cannot be changed on regular files, but inherited.
|
||||
final byte hotId = 12;
|
||||
policies[hotId] = new BlockStoragePolicy(hotId, "HOT",
|
||||
new StorageType[]{StorageType.DISK}, StorageType.EMPTY_ARRAY,
|
||||
|
|
|
@ -294,7 +294,7 @@ class FsDatasetImpl implements FsDatasetSpi<FsVolumeImpl> {
|
|||
// If IOException raises from FsVolumeImpl() or getVolumeMap(), there is
|
||||
// nothing needed to be rolled back to make various data structures, e.g.,
|
||||
// storageMap and asyncDiskService, consistent.
|
||||
FsVolumeImpl fsVolume = FsVolumeImplAllocator.createVolume(
|
||||
FsVolumeImpl fsVolume = new FsVolumeImpl(
|
||||
this, sd.getStorageUuid(), dir, this.conf, storageType);
|
||||
ReplicaMap tempVolumeMap = new ReplicaMap(this);
|
||||
fsVolume.getVolumeMap(tempVolumeMap, ramDiskReplicaTracker);
|
||||
|
|
|
@ -1,60 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
|
||||
/**
|
||||
* Volume for storing replicas in memory. These can be deleted at any time
|
||||
* to make space for new replicas and there is no persistence guarantee.
|
||||
*
|
||||
* The backing store for these replicas is expected to be RAM_DISK.
|
||||
* The backing store may be disk when testing.
|
||||
*
|
||||
* It uses the {@link FsDatasetImpl} object for synchronization.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
@VisibleForTesting
|
||||
public class FsTransientVolumeImpl extends FsVolumeImpl {
|
||||
|
||||
|
||||
FsTransientVolumeImpl(FsDatasetImpl dataset, String storageID, File currentDir,
|
||||
Configuration conf, StorageType storageType)
|
||||
throws IOException {
|
||||
super(dataset, storageID, currentDir, conf, storageType);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
|
||||
// Can't 'cache' replicas already in RAM.
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isTransientStorage() {
|
||||
return true;
|
||||
}
|
||||
}
|
|
@ -96,6 +96,10 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
}
|
||||
|
||||
protected ThreadPoolExecutor initializeCacheExecutor(File parent) {
|
||||
if (storageType.isTransient()) {
|
||||
return null;
|
||||
}
|
||||
|
||||
final int maxNumThreads = dataset.datanode.getConf().getInt(
|
||||
DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_FSDATASETCACHE_MAX_THREADS_PER_VOLUME_DEFAULT);
|
||||
|
@ -203,7 +207,7 @@ public class FsVolumeImpl implements FsVolumeSpi {
|
|||
|
||||
@Override
|
||||
public boolean isTransientStorage() {
|
||||
return false;
|
||||
return storageType.isTransient();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,44 +0,0 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.datanode.fsdataset.impl;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hdfs.StorageType;
|
||||
|
||||
/**
|
||||
* Generate volumes based on the storageType.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class FsVolumeImplAllocator {
|
||||
static FsVolumeImpl createVolume(FsDatasetImpl fsDataset, String storageUuid,
|
||||
File dir, Configuration conf, StorageType storageType)
|
||||
throws IOException {
|
||||
switch(storageType) {
|
||||
case RAM_DISK:
|
||||
return new FsTransientVolumeImpl(
|
||||
fsDataset, storageUuid, dir, conf, storageType);
|
||||
default:
|
||||
return new FsVolumeImpl(
|
||||
fsDataset, storageUuid, dir, conf, storageType);
|
||||
}
|
||||
}
|
||||
}
|
|
@ -60,6 +60,7 @@ import org.apache.hadoop.hdfs.DistributedFileSystem;
|
|||
import org.apache.hadoop.hdfs.XAttrHelper;
|
||||
import org.apache.hadoop.hdfs.protocol.AclException;
|
||||
import org.apache.hadoop.hdfs.protocol.Block;
|
||||
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
|
||||
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
|
||||
import org.apache.hadoop.hdfs.protocol.EncryptionZone;
|
||||
|
@ -277,18 +278,18 @@ public class FSDirectory implements Closeable {
|
|||
}
|
||||
|
||||
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
|
||||
long mtime, long atime, short replication, long preferredBlockSize,
|
||||
boolean isLazyPersist) {
|
||||
return newINodeFile(id, permissions, mtime, atime, replication, preferredBlockSize,
|
||||
isLazyPersist, (byte)0);
|
||||
long mtime, long atime, short replication, long preferredBlockSize) {
|
||||
return new INodeFile(id, null, permissions, mtime, atime,
|
||||
BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize,
|
||||
(byte) 0);
|
||||
}
|
||||
|
||||
private static INodeFile newINodeFile(long id, PermissionStatus permissions,
|
||||
long mtime, long atime, short replication, long preferredBlockSize,
|
||||
boolean isLazyPersist, byte storagePolicyId) {
|
||||
byte storagePolicyId) {
|
||||
return new INodeFile(id, null, permissions, mtime, atime,
|
||||
BlockInfo.EMPTY_ARRAY, replication, preferredBlockSize,
|
||||
isLazyPersist, storagePolicyId);
|
||||
storagePolicyId);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -300,15 +301,13 @@ public class FSDirectory implements Closeable {
|
|||
*/
|
||||
INodeFile addFile(String path, PermissionStatus permissions,
|
||||
short replication, long preferredBlockSize,
|
||||
boolean isLazyPersist,
|
||||
String clientName, String clientMachine)
|
||||
throws FileAlreadyExistsException, QuotaExceededException,
|
||||
UnresolvedLinkException, SnapshotAccessControlException, AclException {
|
||||
|
||||
long modTime = now();
|
||||
INodeFile newNode = newINodeFile(namesystem.allocateNewInodeId(),
|
||||
permissions, modTime, modTime, replication, preferredBlockSize,
|
||||
isLazyPersist);
|
||||
permissions, modTime, modTime, replication, preferredBlockSize);
|
||||
newNode.toUnderConstruction(clientName, clientMachine);
|
||||
|
||||
boolean added = false;
|
||||
|
@ -338,7 +337,6 @@ public class FSDirectory implements Closeable {
|
|||
long modificationTime,
|
||||
long atime,
|
||||
long preferredBlockSize,
|
||||
boolean isLazyPersist,
|
||||
boolean underConstruction,
|
||||
String clientName,
|
||||
String clientMachine,
|
||||
|
@ -347,12 +345,12 @@ public class FSDirectory implements Closeable {
|
|||
assert hasWriteLock();
|
||||
if (underConstruction) {
|
||||
newNode = newINodeFile(id, permissions, modificationTime,
|
||||
modificationTime, replication, preferredBlockSize, isLazyPersist, storagePolicyId);
|
||||
modificationTime, replication, preferredBlockSize, storagePolicyId);
|
||||
newNode.toUnderConstruction(clientName, clientMachine);
|
||||
|
||||
} else {
|
||||
newNode = newINodeFile(id, permissions, modificationTime, atime,
|
||||
replication, preferredBlockSize, isLazyPersist, storagePolicyId);
|
||||
replication, preferredBlockSize, storagePolicyId);
|
||||
}
|
||||
|
||||
try {
|
||||
|
@ -1040,6 +1038,20 @@ public class FSDirectory implements Closeable {
|
|||
}
|
||||
final int snapshotId = iip.getLatestSnapshotId();
|
||||
if (inode.isFile()) {
|
||||
BlockStoragePolicy newPolicy = getBlockManager().getStoragePolicy(policyId);
|
||||
if (newPolicy.isCopyOnCreateFile()) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Policy " + newPolicy + " cannot be set after file creation.");
|
||||
}
|
||||
|
||||
BlockStoragePolicy currentPolicy =
|
||||
getBlockManager().getStoragePolicy(inode.getLocalStoragePolicyID());
|
||||
|
||||
if (currentPolicy != null && currentPolicy.isCopyOnCreateFile()) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Existing policy " + currentPolicy.getName() +
|
||||
" cannot be changed after file creation.");
|
||||
}
|
||||
inode.asFile().setStoragePolicyID(policyId, snapshotId);
|
||||
} else if (inode.isDirectory()) {
|
||||
setDirStoragePolicy(inode.asDirectory(), policyId, snapshotId);
|
||||
|
@ -1546,7 +1558,7 @@ public class FSDirectory implements Closeable {
|
|||
private HdfsFileStatus getFileInfo4DotSnapshot(String src)
|
||||
throws UnresolvedLinkException {
|
||||
if (getINode4DotSnapshot(src) != null) {
|
||||
return new HdfsFileStatus(0, true, 0, 0, false, 0, 0, null, null, null, null,
|
||||
return new HdfsFileStatus(0, true, 0, 0, 0, 0, null, null, null, null,
|
||||
HdfsFileStatus.EMPTY_NAME, -1L, 0, null,
|
||||
BlockStoragePolicySuite.ID_UNSPECIFIED);
|
||||
}
|
||||
|
@ -2406,7 +2418,6 @@ public class FSDirectory implements Closeable {
|
|||
long size = 0; // length is zero for directories
|
||||
short replication = 0;
|
||||
long blocksize = 0;
|
||||
boolean isLazyPersist = false;
|
||||
final boolean isEncrypted;
|
||||
|
||||
final FileEncryptionInfo feInfo = isRawPath ? null :
|
||||
|
@ -2417,7 +2428,6 @@ public class FSDirectory implements Closeable {
|
|||
size = fileNode.computeFileSize(snapshot);
|
||||
replication = fileNode.getFileReplication(snapshot);
|
||||
blocksize = fileNode.getPreferredBlockSize();
|
||||
isLazyPersist = fileNode.getLazyPersistFlag();
|
||||
isEncrypted = (feInfo != null) ||
|
||||
(isRawPath && isInAnEZ(INodesInPath.fromINode(node)));
|
||||
} else {
|
||||
|
@ -2432,7 +2442,6 @@ public class FSDirectory implements Closeable {
|
|||
node.isDirectory(),
|
||||
replication,
|
||||
blocksize,
|
||||
isLazyPersist,
|
||||
node.getModificationTime(snapshot),
|
||||
node.getAccessTime(snapshot),
|
||||
getPermissionForFileStatus(node, snapshot, isEncrypted),
|
||||
|
@ -2456,7 +2465,6 @@ public class FSDirectory implements Closeable {
|
|||
long size = 0; // length is zero for directories
|
||||
short replication = 0;
|
||||
long blocksize = 0;
|
||||
boolean isLazyPersist = false;
|
||||
LocatedBlocks loc = null;
|
||||
final boolean isEncrypted;
|
||||
final FileEncryptionInfo feInfo = isRawPath ? null :
|
||||
|
@ -2466,7 +2474,6 @@ public class FSDirectory implements Closeable {
|
|||
size = fileNode.computeFileSize(snapshot);
|
||||
replication = fileNode.getFileReplication(snapshot);
|
||||
blocksize = fileNode.getPreferredBlockSize();
|
||||
isLazyPersist = fileNode.getLazyPersistFlag();
|
||||
|
||||
final boolean inSnapshot = snapshot != Snapshot.CURRENT_STATE_ID;
|
||||
final boolean isUc = !inSnapshot && fileNode.isUnderConstruction();
|
||||
|
@ -2489,7 +2496,7 @@ public class FSDirectory implements Closeable {
|
|||
|
||||
HdfsLocatedFileStatus status =
|
||||
new HdfsLocatedFileStatus(size, node.isDirectory(), replication,
|
||||
blocksize, isLazyPersist, node.getModificationTime(snapshot),
|
||||
blocksize, node.getModificationTime(snapshot),
|
||||
node.getAccessTime(snapshot),
|
||||
getPermissionForFileStatus(node, snapshot, isEncrypted),
|
||||
node.getUserName(snapshot), node.getGroupName(snapshot),
|
||||
|
|
|
@ -714,7 +714,6 @@ public class FSEditLog implements LogsPurgeable {
|
|||
.setModificationTime(newNode.getModificationTime())
|
||||
.setAccessTime(newNode.getAccessTime())
|
||||
.setBlockSize(newNode.getPreferredBlockSize())
|
||||
.setLazyPersistFlag(newNode.getLazyPersistFlag())
|
||||
.setBlocks(newNode.getBlocks())
|
||||
.setPermissionStatus(permissions)
|
||||
.setClientName(newNode.getFileUnderConstructionFeature().getClientName())
|
||||
|
@ -747,7 +746,6 @@ public class FSEditLog implements LogsPurgeable {
|
|||
.setModificationTime(newNode.getModificationTime())
|
||||
.setAccessTime(newNode.getAccessTime())
|
||||
.setBlockSize(newNode.getPreferredBlockSize())
|
||||
.setLazyPersistFlag(newNode.getLazyPersistFlag())
|
||||
.setBlocks(newNode.getBlocks())
|
||||
.setPermissionStatus(newNode.getPermissionStatus());
|
||||
|
||||
|
|
|
@ -365,8 +365,7 @@ public class FSEditLogLoader {
|
|||
path, addCloseOp.permissions, addCloseOp.aclEntries,
|
||||
addCloseOp.xAttrs,
|
||||
replication, addCloseOp.mtime, addCloseOp.atime,
|
||||
addCloseOp.blockSize, addCloseOp.isLazyPersist,
|
||||
true, addCloseOp.clientName,
|
||||
addCloseOp.blockSize, true, addCloseOp.clientName,
|
||||
addCloseOp.clientMachine, addCloseOp.storagePolicyId);
|
||||
fsNamesys.leaseManager.addLease(addCloseOp.clientName, path);
|
||||
|
||||
|
|
|
@ -404,7 +404,6 @@ public abstract class FSEditLogOp {
|
|||
long mtime;
|
||||
long atime;
|
||||
long blockSize;
|
||||
boolean isLazyPersist;
|
||||
Block[] blocks;
|
||||
PermissionStatus permissions;
|
||||
List<AclEntry> aclEntries;
|
||||
|
@ -416,6 +415,7 @@ public abstract class FSEditLogOp {
|
|||
|
||||
private AddCloseOp(FSEditLogOpCodes opCode) {
|
||||
super(opCode);
|
||||
storagePolicyId = BlockStoragePolicySuite.ID_UNSPECIFIED;
|
||||
assert(opCode == OP_ADD || opCode == OP_CLOSE);
|
||||
}
|
||||
|
||||
|
@ -454,11 +454,6 @@ public abstract class FSEditLogOp {
|
|||
return (T)this;
|
||||
}
|
||||
|
||||
<T extends AddCloseOp> T setLazyPersistFlag(boolean isLazyPersist) {
|
||||
this.isLazyPersist = isLazyPersist;
|
||||
return (T)this;
|
||||
}
|
||||
|
||||
<T extends AddCloseOp> T setBlocks(Block[] blocks) {
|
||||
if (blocks.length > MAX_BLOCKS) {
|
||||
throw new RuntimeException("Can't have more than " + MAX_BLOCKS +
|
||||
|
@ -516,7 +511,6 @@ public abstract class FSEditLogOp {
|
|||
FSImageSerialization.writeLong(mtime, out);
|
||||
FSImageSerialization.writeLong(atime, out);
|
||||
FSImageSerialization.writeLong(blockSize, out);
|
||||
FSImageSerialization.writeInt((isLazyPersist ? 1 : 0), out);
|
||||
new ArrayWritable(Block.class, blocks).write(out);
|
||||
permissions.write(out);
|
||||
|
||||
|
@ -586,13 +580,6 @@ public abstract class FSEditLogOp {
|
|||
this.blockSize = readLong(in);
|
||||
}
|
||||
|
||||
if (NameNodeLayoutVersion.supports(
|
||||
NameNodeLayoutVersion.Feature.LAZY_PERSIST_FILES, logVersion)) {
|
||||
this.isLazyPersist = (FSImageSerialization.readInt(in) != 0);
|
||||
} else {
|
||||
this.isLazyPersist = false;
|
||||
}
|
||||
|
||||
this.blocks = readBlocks(in, logVersion);
|
||||
this.permissions = PermissionStatus.read(in);
|
||||
|
||||
|
@ -658,8 +645,6 @@ public abstract class FSEditLogOp {
|
|||
builder.append(atime);
|
||||
builder.append(", blockSize=");
|
||||
builder.append(blockSize);
|
||||
builder.append(", lazyPersist");
|
||||
builder.append(isLazyPersist);
|
||||
builder.append(", blocks=");
|
||||
builder.append(Arrays.toString(blocks));
|
||||
builder.append(", permissions=");
|
||||
|
@ -700,8 +685,6 @@ public abstract class FSEditLogOp {
|
|||
Long.toString(atime));
|
||||
XMLUtils.addSaxString(contentHandler, "BLOCKSIZE",
|
||||
Long.toString(blockSize));
|
||||
XMLUtils.addSaxString(contentHandler, "LAZY_PERSIST",
|
||||
Boolean.toString(isLazyPersist));
|
||||
XMLUtils.addSaxString(contentHandler, "CLIENT_NAME", clientName);
|
||||
XMLUtils.addSaxString(contentHandler, "CLIENT_MACHINE", clientMachine);
|
||||
XMLUtils.addSaxString(contentHandler, "OVERWRITE",
|
||||
|
@ -728,10 +711,6 @@ public abstract class FSEditLogOp {
|
|||
this.atime = Long.parseLong(st.getValue("ATIME"));
|
||||
this.blockSize = Long.parseLong(st.getValue("BLOCKSIZE"));
|
||||
|
||||
String lazyPersistString = st.getValueOrNull("LAZY_PERSIST");
|
||||
this.isLazyPersist =
|
||||
lazyPersistString != null && Boolean.parseBoolean(lazyPersistString);
|
||||
|
||||
this.clientName = st.getValue("CLIENT_NAME");
|
||||
this.clientMachine = st.getValue("CLIENT_MACHINE");
|
||||
this.overwrite = Boolean.parseBoolean(st.getValueOrNull("OVERWRITE"));
|
||||
|
|
|
@ -784,8 +784,6 @@ public class FSImageFormat {
|
|||
counter.increment();
|
||||
}
|
||||
|
||||
// Images in the old format will not have the lazyPersist flag so it is
|
||||
// safe to pass false always.
|
||||
final INodeFile file = new INodeFile(inodeId, localName, permissions,
|
||||
modificationTime, atime, blocks, replication, blockSize);
|
||||
if (underConstruction) {
|
||||
|
@ -887,10 +885,8 @@ public class FSImageFormat {
|
|||
in.readShort());
|
||||
final long preferredBlockSize = in.readLong();
|
||||
|
||||
// LazyPersist flag will not be present in old image formats and hence
|
||||
// can be safely set to false always.
|
||||
return new INodeFileAttributes.SnapshotCopy(name, permissions, null, modificationTime,
|
||||
accessTime, replication, preferredBlockSize, false, (byte) 0, null);
|
||||
accessTime, replication, preferredBlockSize, (byte) 0, null);
|
||||
}
|
||||
|
||||
public INodeDirectoryAttributes loadINodeDirectoryAttributes(DataInput in)
|
||||
|
|
|
@ -291,7 +291,6 @@ public final class FSImageFormatPBINode {
|
|||
final INodeFile file = new INodeFile(n.getId(),
|
||||
n.getName().toByteArray(), permissions, f.getModificationTime(),
|
||||
f.getAccessTime(), blocks, replication, f.getPreferredBlockSize(),
|
||||
f.hasIsLazyPersist() ? f.getIsLazyPersist() : false,
|
||||
(byte)f.getStoragePolicyID());
|
||||
|
||||
if (f.hasAcl()) {
|
||||
|
@ -404,7 +403,6 @@ public final class FSImageFormatPBINode {
|
|||
.setPermission(buildPermissionStatus(file, state.getStringMap()))
|
||||
.setPreferredBlockSize(file.getPreferredBlockSize())
|
||||
.setReplication(file.getFileReplication())
|
||||
.setIsLazyPersist(file.getLazyPersistFlag())
|
||||
.setStoragePolicyID(file.getLocalStoragePolicyID());
|
||||
|
||||
AclFeature f = file.getAclFeature();
|
||||
|
|
|
@ -362,7 +362,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
Path symlink = stat.isSymlink() ? new Path(stat.getSymlink()) : null;
|
||||
Path path = dst != null ? new Path(dst) : new Path(src);
|
||||
status = new FileStatus(stat.getLen(), stat.isDir(),
|
||||
stat.getReplication(), stat.getBlockSize(), stat.isLazyPersist(),
|
||||
stat.getReplication(), stat.getBlockSize(),
|
||||
stat.getModificationTime(),
|
||||
stat.getAccessTime(), stat.getPermission(), stat.getOwner(),
|
||||
stat.getGroup(), symlink, path);
|
||||
|
@ -2340,6 +2340,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
|
||||
src = FSDirectory.resolvePath(src, pathComponents, dir);
|
||||
INode inode = dir.getINode(src);
|
||||
|
||||
// get the corresponding policy and make sure the policy name is valid
|
||||
BlockStoragePolicy policy = blockManager.getStoragePolicy(policyName);
|
||||
|
@ -2726,7 +2727,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
if (parent != null && mkdirsRecursively(parent.toString(),
|
||||
permissions, true, now())) {
|
||||
newNode = dir.addFile(src, permissions, replication, blockSize,
|
||||
isLazyPersist, holder, clientMachine);
|
||||
holder, clientMachine);
|
||||
}
|
||||
|
||||
if (newNode == null) {
|
||||
|
@ -2741,6 +2742,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
newNode = dir.getInode(newNode.getId()).asFile();
|
||||
}
|
||||
|
||||
setNewINodeStoragePolicy(newNode, iip, isLazyPersist);
|
||||
|
||||
// record file record in log, record new generation stamp
|
||||
getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
|
||||
if (NameNode.stateChangeLog.isDebugEnabled()) {
|
||||
|
@ -2755,6 +2758,37 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
}
|
||||
}
|
||||
|
||||
private void setNewINodeStoragePolicy(INodeFile inode,
|
||||
INodesInPath iip,
|
||||
boolean isLazyPersist)
|
||||
throws IOException {
|
||||
|
||||
if (isLazyPersist) {
|
||||
BlockStoragePolicy lpPolicy =
|
||||
blockManager.getStoragePolicy("LAZY_PERSIST");
|
||||
|
||||
// Set LAZY_PERSIST storage policy if the flag was passed to
|
||||
// CreateFile.
|
||||
if (lpPolicy == null) {
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"The LAZY_PERSIST storage policy has been disabled " +
|
||||
"by the administrator.");
|
||||
}
|
||||
inode.setStoragePolicyID(lpPolicy.getId(),
|
||||
iip.getLatestSnapshotId());
|
||||
} else {
|
||||
BlockStoragePolicy effectivePolicy =
|
||||
blockManager.getStoragePolicy(inode.getStoragePolicyID());
|
||||
|
||||
if (effectivePolicy != null &&
|
||||
effectivePolicy.isCopyOnCreateFile()) {
|
||||
// Copy effective policy from ancestor directory to current file.
|
||||
inode.setStoragePolicyID(effectivePolicy.getId(),
|
||||
iip.getLatestSnapshotId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Append to an existing file for append.
|
||||
* <p>
|
||||
|
@ -2794,8 +2828,11 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
+ src + " for client " + clientMachine);
|
||||
}
|
||||
INodeFile myFile = INodeFile.valueOf(inode, src, true);
|
||||
final BlockStoragePolicy lpPolicy =
|
||||
blockManager.getStoragePolicy("LAZY_PERSIST");
|
||||
|
||||
if (myFile.getLazyPersistFlag()) {
|
||||
if (lpPolicy != null &&
|
||||
lpPolicy.getId() == myFile.getStoragePolicyID()) {
|
||||
throw new UnsupportedOperationException(
|
||||
"Cannot append to lazy persist file " + src);
|
||||
}
|
||||
|
@ -5145,6 +5182,8 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
throws SafeModeException, AccessControlException,
|
||||
UnresolvedLinkException, IOException {
|
||||
|
||||
BlockStoragePolicy lpPolicy = blockManager.getStoragePolicy("LAZY_PERSIST");
|
||||
|
||||
List<BlockCollection> filesToDelete = new ArrayList<BlockCollection>();
|
||||
|
||||
writeLock();
|
||||
|
@ -5155,7 +5194,7 @@ public class FSNamesystem implements Namesystem, FSClusterStats,
|
|||
while (it.hasNext()) {
|
||||
Block b = it.next();
|
||||
BlockInfo blockInfo = blockManager.getStoredBlock(b);
|
||||
if (blockInfo.getBlockCollection().getLazyPersistFlag()) {
|
||||
if (blockInfo.getBlockCollection().getStoragePolicyID() == lpPolicy.getId()) {
|
||||
filesToDelete.add(blockInfo.getBlockCollection());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -78,10 +78,9 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
*/
|
||||
static enum HeaderFormat {
|
||||
PREFERRED_BLOCK_SIZE(null, 48, 1),
|
||||
REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 11, 1),
|
||||
REPLICATION(PREFERRED_BLOCK_SIZE.BITS, 12, 1),
|
||||
STORAGE_POLICY_ID(REPLICATION.BITS, BlockStoragePolicySuite.ID_BIT_LENGTH,
|
||||
0),
|
||||
LAZY_PERSIST(STORAGE_POLICY_ID.BITS, 1, 0);
|
||||
0);
|
||||
|
||||
private final LongBitFormat BITS;
|
||||
|
||||
|
@ -97,21 +96,16 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
return PREFERRED_BLOCK_SIZE.BITS.retrieve(header);
|
||||
}
|
||||
|
||||
static boolean getLazyPersistFlag(long header) {
|
||||
return LAZY_PERSIST.BITS.retrieve(header) == 0 ? false : true;
|
||||
}
|
||||
|
||||
static byte getStoragePolicyID(long header) {
|
||||
return (byte)STORAGE_POLICY_ID.BITS.retrieve(header);
|
||||
}
|
||||
|
||||
static long toLong(long preferredBlockSize, short replication,
|
||||
boolean isLazyPersist, byte storagePolicyID) {
|
||||
byte storagePolicyID) {
|
||||
long h = 0;
|
||||
h = PREFERRED_BLOCK_SIZE.BITS.combine(preferredBlockSize, h);
|
||||
h = REPLICATION.BITS.combine(replication, h);
|
||||
h = STORAGE_POLICY_ID.BITS.combine(storagePolicyID, h);
|
||||
h = LAZY_PERSIST.BITS.combine(isLazyPersist ? 1 : 0, h);
|
||||
return h;
|
||||
}
|
||||
|
||||
|
@ -125,15 +119,14 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
long atime, BlockInfo[] blklist, short replication,
|
||||
long preferredBlockSize) {
|
||||
this(id, name, permissions, mtime, atime, blklist, replication,
|
||||
preferredBlockSize, false, (byte) 0);
|
||||
preferredBlockSize, (byte) 0);
|
||||
}
|
||||
|
||||
INodeFile(long id, byte[] name, PermissionStatus permissions, long mtime,
|
||||
long atime, BlockInfo[] blklist, short replication,
|
||||
long preferredBlockSize, boolean isLazyPersist, byte storagePolicyID) {
|
||||
long preferredBlockSize, byte storagePolicyID) {
|
||||
super(id, name, permissions, mtime, atime);
|
||||
header = HeaderFormat.toLong(preferredBlockSize, replication,
|
||||
isLazyPersist, storagePolicyID);
|
||||
header = HeaderFormat.toLong(preferredBlockSize, replication, storagePolicyID);
|
||||
this.blocks = blklist;
|
||||
}
|
||||
|
||||
|
@ -381,11 +374,6 @@ public class INodeFile extends INodeWithAdditionalFields
|
|||
return HeaderFormat.getPreferredBlockSize(header);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getLazyPersistFlag() {
|
||||
return HeaderFormat.getLazyPersistFlag(header);
|
||||
}
|
||||
|
||||
@Override
|
||||
public byte getLocalStoragePolicyID() {
|
||||
return HeaderFormat.getStoragePolicyID(header);
|
||||
|
|
|
@ -32,8 +32,6 @@ public interface INodeFileAttributes extends INodeAttributes {
|
|||
/** @return preferred block size in bytes */
|
||||
public long getPreferredBlockSize();
|
||||
|
||||
public boolean getLazyPersistFlag();
|
||||
|
||||
/** @return the header as a long. */
|
||||
public long getHeaderLong();
|
||||
|
||||
|
@ -48,12 +46,12 @@ public interface INodeFileAttributes extends INodeAttributes {
|
|||
|
||||
public SnapshotCopy(byte[] name, PermissionStatus permissions,
|
||||
AclFeature aclFeature, long modificationTime, long accessTime,
|
||||
short replication, long preferredBlockSize, boolean isLazyPersist,
|
||||
short replication, long preferredBlockSize,
|
||||
byte storagePolicyID, XAttrFeature xAttrsFeature) {
|
||||
super(name, permissions, aclFeature, modificationTime, accessTime,
|
||||
xAttrsFeature);
|
||||
header = HeaderFormat.toLong(preferredBlockSize, replication,
|
||||
isLazyPersist, storagePolicyID);
|
||||
storagePolicyID);
|
||||
}
|
||||
|
||||
public SnapshotCopy(INodeFile file) {
|
||||
|
@ -71,9 +69,6 @@ public interface INodeFileAttributes extends INodeAttributes {
|
|||
return HeaderFormat.getPreferredBlockSize(header);
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean getLazyPersistFlag() { return HeaderFormat.getLazyPersistFlag(header); }
|
||||
|
||||
@Override
|
||||
public byte getLocalStoragePolicyID() {
|
||||
return HeaderFormat.getStoragePolicyID(header);
|
||||
|
|
|
@ -69,9 +69,7 @@ public class NameNodeLayoutVersion {
|
|||
CREATE_OVERWRITE(-58, "Use single editlog record for " +
|
||||
"creating file with overwrite"),
|
||||
XATTRS_NAMESPACE_EXT(-59, "Increase number of xattr namespaces"),
|
||||
BLOCK_STORAGE_POLICY(-60, "Block Storage policy"),
|
||||
LAZY_PERSIST_FILES(-60, "Support for optional lazy persistence of " +
|
||||
" files with reduced durability guarantees");
|
||||
BLOCK_STORAGE_POLICY(-60, "Block Storage policy");
|
||||
|
||||
private final FeatureInfo info;
|
||||
|
||||
|
|
|
@ -221,7 +221,6 @@ public class FSImageFormatPBSnapshot {
|
|||
.toByteArray(), permission, acl, fileInPb.getModificationTime(),
|
||||
fileInPb.getAccessTime(), (short) fileInPb.getReplication(),
|
||||
fileInPb.getPreferredBlockSize(),
|
||||
fileInPb.hasIsLazyPersist() ? fileInPb.getIsLazyPersist() : false,
|
||||
(byte)fileInPb.getStoragePolicyID(), xAttrs);
|
||||
}
|
||||
|
||||
|
|
|
@ -391,7 +391,6 @@ class FSImageLoader {
|
|||
f.getPermission(), stringTable);
|
||||
map.put("accessTime", f.getAccessTime());
|
||||
map.put("blockSize", f.getPreferredBlockSize());
|
||||
map.put("lazyPersist", f.getIsLazyPersist());
|
||||
map.put("group", p.getGroupName());
|
||||
map.put("length", getFileSize(f));
|
||||
map.put("modificationTime", f.getModificationTime());
|
||||
|
|
|
@ -247,10 +247,6 @@ public final class PBImageXmlWriter {
|
|||
.o("perferredBlockSize", f.getPreferredBlockSize())
|
||||
.o("permission", dumpPermission(f.getPermission()));
|
||||
|
||||
if (f.hasIsLazyPersist()) {
|
||||
o("lazyPersist", f.getIsLazyPersist());
|
||||
}
|
||||
|
||||
if (f.getBlocksCount() > 0) {
|
||||
out.print("<blocks>");
|
||||
for (BlockProto b : f.getBlocksList()) {
|
||||
|
|
|
@ -269,7 +269,7 @@ public class JsonUtil {
|
|||
(byte) (long) (Long) m.get("storagePolicy") :
|
||||
BlockStoragePolicySuite.ID_UNSPECIFIED;
|
||||
return new HdfsFileStatus(len, type == PathType.DIRECTORY, replication,
|
||||
blockSize, isLazyPersist, mTime, aTime, permission, owner, group,
|
||||
blockSize, mTime, aTime, permission, owner, group,
|
||||
symlink, DFSUtil.string2Bytes(localName), fileId, childrenNum, null,
|
||||
storagePolicy);
|
||||
}
|
||||
|
|
|
@ -139,7 +139,6 @@ message INodeSection {
|
|||
optional AclFeatureProto acl = 8;
|
||||
optional XAttrFeatureProto xAttrs = 9;
|
||||
optional uint32 storagePolicyID = 10;
|
||||
optional bool isLazyPersist = 11 [default = false];
|
||||
}
|
||||
|
||||
message INodeDirectory {
|
||||
|
|
|
@ -310,7 +310,6 @@ message HdfsFileStatusProto {
|
|||
optional FileEncryptionInfoProto fileEncryptionInfo = 15;
|
||||
|
||||
optional uint32 storagePolicy = 16 [default = 0]; // block storage policy id
|
||||
optional bool isLazyPersist = 17 [default = false];
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -254,12 +254,12 @@ public class TestDFSClientRetries {
|
|||
anyLong(), any(String[].class))).thenAnswer(answer);
|
||||
|
||||
Mockito.doReturn(
|
||||
new HdfsFileStatus(0, false, 1, 1024, false, 0, 0, new FsPermission(
|
||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||
1010, 0, null, (byte) 0)).when(mockNN).getFileInfo(anyString());
|
||||
|
||||
Mockito.doReturn(
|
||||
new HdfsFileStatus(0, false, 1, 1024, false, 0, 0, new FsPermission(
|
||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||
1010, 0, null, (byte) 0))
|
||||
.when(mockNN)
|
||||
|
|
|
@ -716,7 +716,7 @@ public class TestEncryptionZones {
|
|||
private static void mockCreate(ClientProtocol mcp,
|
||||
CipherSuite suite, CryptoProtocolVersion version) throws Exception {
|
||||
Mockito.doReturn(
|
||||
new HdfsFileStatus(0, false, 1, 1024, false, 0, 0, new FsPermission(
|
||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||
1010, 0, new FileEncryptionInfo(suite,
|
||||
version, new byte[suite.getAlgorithmBlockSize()],
|
||||
|
|
|
@ -342,12 +342,12 @@ public class TestLease {
|
|||
}
|
||||
|
||||
Mockito.doReturn(
|
||||
new HdfsFileStatus(0, false, 1, 1024, false, 0, 0, new FsPermission(
|
||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||
1010, 0, null, (byte) 0)).when(mcp).getFileInfo(anyString());
|
||||
Mockito
|
||||
.doReturn(
|
||||
new HdfsFileStatus(0, false, 1, 1024, false, 0, 0, new FsPermission(
|
||||
new HdfsFileStatus(0, false, 1, 1024, 0, 0, new FsPermission(
|
||||
(short) 777), "owner", "group", new byte[0], new byte[0],
|
||||
1010, 0, null, (byte) 0))
|
||||
.when(mcp)
|
||||
|
|
|
@ -56,6 +56,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.*;
|
|||
import static org.apache.hadoop.hdfs.StorageType.DEFAULT;
|
||||
import static org.apache.hadoop.hdfs.StorageType.RAM_DISK;
|
||||
import static org.hamcrest.core.Is.is;
|
||||
import static org.hamcrest.core.IsNot.not;
|
||||
import static org.junit.Assert.assertThat;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
|
@ -68,6 +69,8 @@ public class TestLazyPersistFiles {
|
|||
((Log4JLogger) FsDatasetImpl.LOG).getLogger().setLevel(Level.ALL);
|
||||
}
|
||||
|
||||
private static final byte LAZY_PERSIST_POLICY_ID = (byte) 15;
|
||||
|
||||
private static final int THREADPOOL_SIZE = 10;
|
||||
|
||||
private static final short REPL_FACTOR = 1;
|
||||
|
@ -100,19 +103,20 @@ public class TestLazyPersistFiles {
|
|||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
public void testFlagNotSetByDefault() throws IOException {
|
||||
public void testPolicyNotSetByDefault() throws IOException {
|
||||
startUpCluster(false, -1);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
Path path = new Path("/" + METHOD_NAME + ".dat");
|
||||
|
||||
makeTestFile(path, 0, false);
|
||||
// Stat the file and check that the lazyPersist flag is returned back.
|
||||
// Stat the file and check that the LAZY_PERSIST policy is not
|
||||
// returned back.
|
||||
HdfsFileStatus status = client.getFileInfo(path.toString());
|
||||
assertThat(status.isLazyPersist(), is(false));
|
||||
assertThat(status.getStoragePolicy(), not(LAZY_PERSIST_POLICY_ID));
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
public void testFlagPropagation() throws IOException {
|
||||
public void testPolicyPropagation() throws IOException {
|
||||
startUpCluster(false, -1);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
Path path = new Path("/" + METHOD_NAME + ".dat");
|
||||
|
@ -120,11 +124,11 @@ public class TestLazyPersistFiles {
|
|||
makeTestFile(path, 0, true);
|
||||
// Stat the file and check that the lazyPersist flag is returned back.
|
||||
HdfsFileStatus status = client.getFileInfo(path.toString());
|
||||
assertThat(status.isLazyPersist(), is(true));
|
||||
assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
public void testFlagPersistenceInEditLog() throws IOException {
|
||||
public void testPolicyPersistenceInEditLog() throws IOException {
|
||||
startUpCluster(false, -1);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
Path path = new Path("/" + METHOD_NAME + ".dat");
|
||||
|
@ -134,11 +138,11 @@ public class TestLazyPersistFiles {
|
|||
|
||||
// Stat the file and check that the lazyPersist flag is returned back.
|
||||
HdfsFileStatus status = client.getFileInfo(path.toString());
|
||||
assertThat(status.isLazyPersist(), is(true));
|
||||
assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
public void testFlagPersistenceInFsImage() throws IOException {
|
||||
public void testPolicyPersistenceInFsImage() throws IOException {
|
||||
startUpCluster(false, -1);
|
||||
final String METHOD_NAME = GenericTestUtils.getMethodName();
|
||||
Path path = new Path("/" + METHOD_NAME + ".dat");
|
||||
|
@ -152,7 +156,7 @@ public class TestLazyPersistFiles {
|
|||
|
||||
// Stat the file and check that the lazyPersist flag is returned back.
|
||||
HdfsFileStatus status = client.getFileInfo(path.toString());
|
||||
assertThat(status.isLazyPersist(), is(true));
|
||||
assertThat(status.getStoragePolicy(), is(LAZY_PERSIST_POLICY_ID));
|
||||
}
|
||||
|
||||
@Test (timeout=300000)
|
||||
|
|
|
@ -255,7 +255,7 @@ public class TestScrLazyPersistFiles {
|
|||
|
||||
for (FsVolumeSpi volume : volumes) {
|
||||
if (volume.getStorageType() == RAM_DISK) {
|
||||
((FsTransientVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
|
||||
((FsVolumeImpl) volume).setCapacityForTesting(ramDiskStorageLimit);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -263,13 +263,6 @@ public class TestScrLazyPersistFiles {
|
|||
LOG.info("Cluster startup complete");
|
||||
}
|
||||
|
||||
private void startUpCluster(final int numDataNodes,
|
||||
final StorageType[] storageTypes,
|
||||
final long ramDiskStorageLimit)
|
||||
throws IOException {
|
||||
startUpCluster(numDataNodes, storageTypes, ramDiskStorageLimit, false);
|
||||
}
|
||||
|
||||
private void makeTestFile(Path path, long length, final boolean isLazyPersist)
|
||||
throws IOException {
|
||||
|
||||
|
@ -324,14 +317,6 @@ public class TestScrLazyPersistFiles {
|
|||
BLOCK_SIZE, REPL_FACTOR, seed, true);
|
||||
}
|
||||
|
||||
private boolean verifyReadRandomFile(
|
||||
Path path, int fileLength, int seed) throws IOException {
|
||||
byte contents[] = DFSTestUtil.readFileBuffer(fs, path);
|
||||
byte expected[] = DFSTestUtil.
|
||||
calculateFileContentsFromSeed(seed, fileLength);
|
||||
return Arrays.equals(contents, expected);
|
||||
}
|
||||
|
||||
private void triggerBlockReport()
|
||||
throws IOException, InterruptedException {
|
||||
// Trigger block report to NN
|
||||
|
|
|
@ -1018,7 +1018,7 @@ public class TestFsck {
|
|||
byte storagePolicy = 0;
|
||||
|
||||
HdfsFileStatus file = new HdfsFileStatus(length, isDir, blockReplication,
|
||||
blockSize, false, modTime, accessTime, perms, owner, group, symlink,
|
||||
blockSize, modTime, accessTime, perms, owner, group, symlink,
|
||||
path, fileId, numChildren, null, storagePolicy);
|
||||
Result res = new Result(conf);
|
||||
|
||||
|
|
|
@ -87,7 +87,7 @@ public class TestINodeFile {
|
|||
|
||||
private static INodeFile createINodeFile(byte storagePolicyID) {
|
||||
return new INodeFile(INodeId.GRANDFATHER_INODE_ID, null, perm, 0L, 0L,
|
||||
null, (short)3, 1024L, false, storagePolicyID);
|
||||
null, (short)3, 1024L, storagePolicyID);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
|
|
@ -62,7 +62,7 @@ public class TestJsonUtil {
|
|||
final long now = Time.now();
|
||||
final String parent = "/dir";
|
||||
final HdfsFileStatus status = new HdfsFileStatus(1001L, false, 3, 1L << 26,
|
||||
false, now, now + 10, new FsPermission((short) 0644), "user", "group",
|
||||
now, now + 10, new FsPermission((short) 0644), "user", "group",
|
||||
DFSUtil.string2Bytes("bar"), DFSUtil.string2Bytes("foo"),
|
||||
INodeId.GRANDFATHER_INODE_ID, 0, null, (byte) 0);
|
||||
final FileStatus fstatus = toFileStatus(status, parent);
|
||||
|
|
|
@ -4554,42 +4554,6 @@
|
|||
</comparators>
|
||||
</test>
|
||||
|
||||
<test> <!-- TESTED -->
|
||||
<description>put: The LazyPersist flag is set when requested</description>
|
||||
<test-commands>
|
||||
<command>-fs NAMENODE -mkdir /dirLp</command>
|
||||
<command>-fs NAMENODE -put -l CLITEST_DATA/data15bytes /dirLp/data15bytes</command>
|
||||
<command>-fs NAMENODE -stat %l /dirLp/data15bytes</command>
|
||||
</test-commands>
|
||||
<cleanup-commands>
|
||||
<command>-fs NAMENODE -rm -r /dirLp/</command>
|
||||
</cleanup-commands>
|
||||
<comparators>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^true</expected-output>
|
||||
</comparator>
|
||||
</comparators>
|
||||
</test>
|
||||
|
||||
<test> <!-- TESTED -->
|
||||
<description>put: The LazyPersist flag is not set by default</description>
|
||||
<test-commands>
|
||||
<command>-fs NAMENODE -mkdir /dirLp</command>
|
||||
<command>-fs NAMENODE -put CLITEST_DATA/data15bytes /dirLp/data15bytes</command>
|
||||
<command>-fs NAMENODE -stat %l /dirLp/data15bytes</command>
|
||||
</test-commands>
|
||||
<cleanup-commands>
|
||||
<command>-fs NAMENODE -rm -r /dirLp/</command>
|
||||
</cleanup-commands>
|
||||
<comparators>
|
||||
<comparator>
|
||||
<type>RegexpComparator</type>
|
||||
<expected-output>^false</expected-output>
|
||||
</comparator>
|
||||
</comparators>
|
||||
</test>
|
||||
|
||||
<!-- Tests for copyFromLocal -->
|
||||
<test> <!-- TESTED -->
|
||||
<description>copyFromLocal: copying file into a file (absolute path)</description>
|
||||
|
|
Loading…
Reference in New Issue