HDFS-11163. Mover should move the file blocks to default storage once policy is unset. Contributed by Surendra Singh Lilhore.
This commit is contained in:
parent
62e4573efb
commit
23b1a7bdf1
|
@ -55,15 +55,25 @@ public class FsServerDefaults implements Writable {
|
||||||
private long trashInterval;
|
private long trashInterval;
|
||||||
private DataChecksum.Type checksumType;
|
private DataChecksum.Type checksumType;
|
||||||
private String keyProviderUri;
|
private String keyProviderUri;
|
||||||
|
private byte storagepolicyId;
|
||||||
|
|
||||||
public FsServerDefaults() {
|
public FsServerDefaults() {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public FsServerDefaults(long blockSize, int bytesPerChecksum,
|
||||||
|
int writePacketSize, short replication, int fileBufferSize,
|
||||||
|
boolean encryptDataTransfer, long trashInterval,
|
||||||
|
DataChecksum.Type checksumType, String keyProviderUri) {
|
||||||
|
this(blockSize, bytesPerChecksum, writePacketSize, replication,
|
||||||
|
fileBufferSize, encryptDataTransfer, trashInterval, checksumType,
|
||||||
|
keyProviderUri, (byte) 0);
|
||||||
|
}
|
||||||
|
|
||||||
public FsServerDefaults(long blockSize, int bytesPerChecksum,
|
public FsServerDefaults(long blockSize, int bytesPerChecksum,
|
||||||
int writePacketSize, short replication, int fileBufferSize,
|
int writePacketSize, short replication, int fileBufferSize,
|
||||||
boolean encryptDataTransfer, long trashInterval,
|
boolean encryptDataTransfer, long trashInterval,
|
||||||
DataChecksum.Type checksumType,
|
DataChecksum.Type checksumType,
|
||||||
String keyProviderUri) {
|
String keyProviderUri, byte storagepolicy) {
|
||||||
this.blockSize = blockSize;
|
this.blockSize = blockSize;
|
||||||
this.bytesPerChecksum = bytesPerChecksum;
|
this.bytesPerChecksum = bytesPerChecksum;
|
||||||
this.writePacketSize = writePacketSize;
|
this.writePacketSize = writePacketSize;
|
||||||
|
@ -73,6 +83,7 @@ public class FsServerDefaults implements Writable {
|
||||||
this.trashInterval = trashInterval;
|
this.trashInterval = trashInterval;
|
||||||
this.checksumType = checksumType;
|
this.checksumType = checksumType;
|
||||||
this.keyProviderUri = keyProviderUri;
|
this.keyProviderUri = keyProviderUri;
|
||||||
|
this.storagepolicyId = storagepolicy;
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getBlockSize() {
|
public long getBlockSize() {
|
||||||
|
@ -115,6 +126,10 @@ public class FsServerDefaults implements Writable {
|
||||||
return keyProviderUri;
|
return keyProviderUri;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public byte getDefaultStoragePolicyId() {
|
||||||
|
return storagepolicyId;
|
||||||
|
}
|
||||||
|
|
||||||
// /////////////////////////////////////////
|
// /////////////////////////////////////////
|
||||||
// Writable
|
// Writable
|
||||||
// /////////////////////////////////////////
|
// /////////////////////////////////////////
|
||||||
|
@ -127,6 +142,7 @@ public class FsServerDefaults implements Writable {
|
||||||
out.writeShort(replication);
|
out.writeShort(replication);
|
||||||
out.writeInt(fileBufferSize);
|
out.writeInt(fileBufferSize);
|
||||||
WritableUtils.writeEnum(out, checksumType);
|
WritableUtils.writeEnum(out, checksumType);
|
||||||
|
out.writeByte(storagepolicyId);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -138,5 +154,6 @@ public class FsServerDefaults implements Writable {
|
||||||
replication = in.readShort();
|
replication = in.readShort();
|
||||||
fileBufferSize = in.readInt();
|
fileBufferSize = in.readInt();
|
||||||
checksumType = WritableUtils.readEnum(in, DataChecksum.Type.class);
|
checksumType = WritableUtils.readEnum(in, DataChecksum.Type.class);
|
||||||
|
storagepolicyId = in.readByte();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -1759,7 +1759,8 @@ public class PBHelperClient {
|
||||||
fs.getEncryptDataTransfer(),
|
fs.getEncryptDataTransfer(),
|
||||||
fs.getTrashInterval(),
|
fs.getTrashInterval(),
|
||||||
convert(fs.getChecksumType()),
|
convert(fs.getChecksumType()),
|
||||||
fs.hasKeyProviderUri() ? fs.getKeyProviderUri() : null);
|
fs.hasKeyProviderUri() ? fs.getKeyProviderUri() : null,
|
||||||
|
(byte) fs.getPolicyId());
|
||||||
}
|
}
|
||||||
|
|
||||||
public static List<CryptoProtocolVersionProto> convert(
|
public static List<CryptoProtocolVersionProto> convert(
|
||||||
|
@ -1934,6 +1935,7 @@ public class PBHelperClient {
|
||||||
.setTrashInterval(fs.getTrashInterval())
|
.setTrashInterval(fs.getTrashInterval())
|
||||||
.setChecksumType(convert(fs.getChecksumType()))
|
.setChecksumType(convert(fs.getChecksumType()))
|
||||||
.setKeyProviderUri(fs.getKeyProviderUri())
|
.setKeyProviderUri(fs.getKeyProviderUri())
|
||||||
|
.setPolicyId(fs.getDefaultStoragePolicyId())
|
||||||
.build();
|
.build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -421,6 +421,7 @@ message FsServerDefaultsProto {
|
||||||
optional uint64 trashInterval = 7 [default = 0];
|
optional uint64 trashInterval = 7 [default = 0];
|
||||||
optional ChecksumTypeProto checksumType = 8 [default = CHECKSUM_CRC32];
|
optional ChecksumTypeProto checksumType = 8 [default = CHECKSUM_CRC32];
|
||||||
optional string keyProviderUri = 9;
|
optional string keyProviderUri = 9;
|
||||||
|
optional uint32 policyId = 10 [default = 0];
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -375,11 +375,16 @@ public class Mover {
|
||||||
/** @return true if it is necessary to run another round of migration */
|
/** @return true if it is necessary to run another round of migration */
|
||||||
private void processFile(String fullPath, HdfsLocatedFileStatus status,
|
private void processFile(String fullPath, HdfsLocatedFileStatus status,
|
||||||
Result result) {
|
Result result) {
|
||||||
final byte policyId = status.getStoragePolicy();
|
byte policyId = status.getStoragePolicy();
|
||||||
// currently we ignore files with unspecified storage policy
|
|
||||||
if (policyId == HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) {
|
if (policyId == HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) {
|
||||||
|
try {
|
||||||
|
// get default policy from namenode
|
||||||
|
policyId = dfs.getServerDefaults().getDefaultStoragePolicyId();
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Failed to get default policy for " + fullPath, e);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
}
|
||||||
final BlockStoragePolicy policy = blockStoragePolicies[policyId];
|
final BlockStoragePolicy policy = blockStoragePolicies[policyId];
|
||||||
if (policy == null) {
|
if (policy == null) {
|
||||||
LOG.warn("Failed to get the storage policy of file " + fullPath);
|
LOG.warn("Failed to get the storage policy of file " + fullPath);
|
||||||
|
|
|
@ -782,7 +782,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
checksumType,
|
checksumType,
|
||||||
conf.getTrimmed(
|
conf.getTrimmed(
|
||||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
|
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
|
||||||
""));
|
""),
|
||||||
|
blockManager.getStoragePolicySuite().getDefaultPolicy().getId());
|
||||||
|
|
||||||
this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY,
|
this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY,
|
||||||
DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
|
DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
|
||||||
|
|
|
@ -16,6 +16,7 @@
|
||||||
* limitations under the License.
|
* limitations under the License.
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs;
|
package org.apache.hadoop.hdfs;
|
||||||
|
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_DEFAULT;
|
||||||
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.IO_FILE_BUFFER_SIZE_KEY;
|
||||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_BLOCK_SIZE_DEFAULT;
|
||||||
|
@ -171,6 +172,7 @@ public class TestFileCreation {
|
||||||
assertEquals(DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT, serverDefaults.getWritePacketSize());
|
assertEquals(DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT, serverDefaults.getWritePacketSize());
|
||||||
assertEquals(DFS_REPLICATION_DEFAULT + 1, serverDefaults.getReplication());
|
assertEquals(DFS_REPLICATION_DEFAULT + 1, serverDefaults.getReplication());
|
||||||
assertEquals(IO_FILE_BUFFER_SIZE_DEFAULT, serverDefaults.getFileBufferSize());
|
assertEquals(IO_FILE_BUFFER_SIZE_DEFAULT, serverDefaults.getFileBufferSize());
|
||||||
|
assertEquals(7, serverDefaults.getDefaultStoragePolicyId());
|
||||||
} finally {
|
} finally {
|
||||||
fs.close();
|
fs.close();
|
||||||
cluster.shutdown();
|
cluster.shutdown();
|
||||||
|
|
|
@ -822,6 +822,48 @@ public class TestMover {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 300000)
|
||||||
|
public void testMoverWhenStoragePolicyUnset() throws Exception {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
initConf(conf);
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(1)
|
||||||
|
.storageTypes(
|
||||||
|
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE}})
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||||
|
final String file = "/testMoverWhenStoragePolicyUnset";
|
||||||
|
// write to DISK
|
||||||
|
DFSTestUtil.createFile(dfs, new Path(file), 1L, (short) 1, 0L);
|
||||||
|
|
||||||
|
// move to ARCHIVE
|
||||||
|
dfs.setStoragePolicy(new Path(file), "COLD");
|
||||||
|
int rc = ToolRunner.run(conf, new Mover.Cli(),
|
||||||
|
new String[] {"-p", file.toString()});
|
||||||
|
Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc);
|
||||||
|
|
||||||
|
// Wait till namenode notified about the block location details
|
||||||
|
waitForLocatedBlockWithArchiveStorageType(dfs, file, 1);
|
||||||
|
|
||||||
|
// verify before unset policy
|
||||||
|
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
||||||
|
Assert.assertTrue(StorageType.ARCHIVE == (lb.getStorageTypes())[0]);
|
||||||
|
|
||||||
|
// unset storage policy
|
||||||
|
dfs.unsetStoragePolicy(new Path(file));
|
||||||
|
rc = ToolRunner.run(conf, new Mover.Cli(),
|
||||||
|
new String[] {"-p", file.toString()});
|
||||||
|
Assert.assertEquals("Movement to DISK should be successful", 0, rc);
|
||||||
|
|
||||||
|
lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
||||||
|
Assert.assertTrue(StorageType.DISK == (lb.getStorageTypes())[0]);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private void createFileWithFavoredDatanodes(final Configuration conf,
|
private void createFileWithFavoredDatanodes(final Configuration conf,
|
||||||
final MiniDFSCluster cluster, final DistributedFileSystem dfs)
|
final MiniDFSCluster cluster, final DistributedFileSystem dfs)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
|
Loading…
Reference in New Issue