HDFS-11163. Mover should move the file blocks to default storage once policy is unset. Contributed by Surendra Singh Lilhore.
(cherry picked from commit 00ed21a6fedb45a7c8992b8d45adaa83f14af34c)
(cherry picked from commit d5e2bd4096
)
This commit is contained in:
parent
32475a7b6c
commit
c4bf504395
|
@ -55,6 +55,7 @@ public class FsServerDefaults implements Writable {
|
|||
private long trashInterval;
|
||||
private DataChecksum.Type checksumType;
|
||||
private String keyProviderUri;
|
||||
private byte storagepolicyId;
|
||||
|
||||
public FsServerDefaults() {
|
||||
}
|
||||
|
@ -62,8 +63,17 @@ public class FsServerDefaults implements Writable {
|
|||
public FsServerDefaults(long blockSize, int bytesPerChecksum,
|
||||
int writePacketSize, short replication, int fileBufferSize,
|
||||
boolean encryptDataTransfer, long trashInterval,
|
||||
DataChecksum.Type checksumType,
|
||||
String keyProviderUri) {
|
||||
DataChecksum.Type checksumType, String keyProviderUri) {
|
||||
this(blockSize, bytesPerChecksum, writePacketSize, replication,
|
||||
fileBufferSize, encryptDataTransfer, trashInterval, checksumType,
|
||||
keyProviderUri, (byte) 0);
|
||||
}
|
||||
|
||||
public FsServerDefaults(long blockSize, int bytesPerChecksum,
|
||||
int writePacketSize, short replication, int fileBufferSize,
|
||||
boolean encryptDataTransfer, long trashInterval,
|
||||
DataChecksum.Type checksumType, String keyProviderUri,
|
||||
byte storagepolicy) {
|
||||
this.blockSize = blockSize;
|
||||
this.bytesPerChecksum = bytesPerChecksum;
|
||||
this.writePacketSize = writePacketSize;
|
||||
|
@ -73,6 +83,7 @@ public class FsServerDefaults implements Writable {
|
|||
this.trashInterval = trashInterval;
|
||||
this.checksumType = checksumType;
|
||||
this.keyProviderUri = keyProviderUri;
|
||||
this.storagepolicyId = storagepolicy;
|
||||
}
|
||||
|
||||
public long getBlockSize() {
|
||||
|
@ -115,6 +126,10 @@ public class FsServerDefaults implements Writable {
|
|||
return keyProviderUri;
|
||||
}
|
||||
|
||||
public byte getDefaultStoragePolicyId() {
|
||||
return storagepolicyId;
|
||||
}
|
||||
|
||||
// /////////////////////////////////////////
|
||||
// Writable
|
||||
// /////////////////////////////////////////
|
||||
|
@ -127,6 +142,7 @@ public class FsServerDefaults implements Writable {
|
|||
out.writeShort(replication);
|
||||
out.writeInt(fileBufferSize);
|
||||
WritableUtils.writeEnum(out, checksumType);
|
||||
out.writeByte(storagepolicyId);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -138,5 +154,6 @@ public class FsServerDefaults implements Writable {
|
|||
replication = in.readShort();
|
||||
fileBufferSize = in.readInt();
|
||||
checksumType = WritableUtils.readEnum(in, DataChecksum.Type.class);
|
||||
storagepolicyId = in.readByte();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -1592,7 +1592,8 @@ public class PBHelperClient {
|
|||
fs.getEncryptDataTransfer(),
|
||||
fs.getTrashInterval(),
|
||||
convert(fs.getChecksumType()),
|
||||
fs.hasKeyProviderUri() ? fs.getKeyProviderUri() : null);
|
||||
fs.hasKeyProviderUri() ? fs.getKeyProviderUri() : null,
|
||||
(byte) fs.getPolicyId());
|
||||
}
|
||||
|
||||
public static List<CryptoProtocolVersionProto> convert(
|
||||
|
@ -1744,6 +1745,7 @@ public class PBHelperClient {
|
|||
.setTrashInterval(fs.getTrashInterval())
|
||||
.setChecksumType(convert(fs.getChecksumType()))
|
||||
.setKeyProviderUri(fs.getKeyProviderUri())
|
||||
.setPolicyId(fs.getDefaultStoragePolicyId())
|
||||
.build();
|
||||
}
|
||||
|
||||
|
|
|
@ -374,6 +374,7 @@ message FsServerDefaultsProto {
|
|||
optional uint64 trashInterval = 7 [default = 0];
|
||||
optional ChecksumTypeProto checksumType = 8 [default = CHECKSUM_CRC32];
|
||||
optional string keyProviderUri = 9;
|
||||
optional uint32 policyId = 10 [default = 0];
|
||||
}
|
||||
|
||||
|
||||
|
|
|
@ -351,10 +351,15 @@ public class Mover {
|
|||
/** @return true if it is necessary to run another round of migration */
|
||||
private void processFile(String fullPath, HdfsLocatedFileStatus status,
|
||||
Result result) {
|
||||
final byte policyId = status.getStoragePolicy();
|
||||
// currently we ignore files with unspecified storage policy
|
||||
byte policyId = status.getStoragePolicy();
|
||||
if (policyId == HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) {
|
||||
return;
|
||||
try {
|
||||
// get default policy from namenode
|
||||
policyId = dfs.getServerDefaults().getDefaultStoragePolicyId();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Failed to get default policy for " + fullPath, e);
|
||||
return;
|
||||
}
|
||||
}
|
||||
final BlockStoragePolicy policy = blockStoragePolicies[policyId];
|
||||
if (policy == null) {
|
||||
|
|
|
@ -792,8 +792,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
|||
conf.getLong(FS_TRASH_INTERVAL_KEY, FS_TRASH_INTERVAL_DEFAULT),
|
||||
checksumType,
|
||||
conf.getTrimmed(
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
|
||||
""));
|
||||
CommonConfigurationKeysPublic.HADOOP_SECURITY_KEY_PROVIDER_PATH,
|
||||
""),
|
||||
blockManager.getStoragePolicySuite().getDefaultPolicy().getId());
|
||||
|
||||
this.maxFsObjects = conf.getLong(DFS_NAMENODE_MAX_OBJECTS_KEY,
|
||||
DFS_NAMENODE_MAX_OBJECTS_DEFAULT);
|
||||
|
|
|
@ -171,6 +171,7 @@ public class TestFileCreation {
|
|||
assertEquals(DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT, serverDefaults.getWritePacketSize());
|
||||
assertEquals(DFS_REPLICATION_DEFAULT + 1, serverDefaults.getReplication());
|
||||
assertEquals(IO_FILE_BUFFER_SIZE_DEFAULT, serverDefaults.getFileBufferSize());
|
||||
assertEquals(7, serverDefaults.getDefaultStoragePolicyId());
|
||||
} finally {
|
||||
fs.close();
|
||||
cluster.shutdown();
|
||||
|
|
|
@ -22,7 +22,9 @@ import java.net.URI;
|
|||
import java.util.*;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import com.google.common.base.Supplier;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
|
@ -45,9 +47,12 @@ import org.apache.hadoop.test.GenericTestUtils;
|
|||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
public class TestMover {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(TestMover.class);
|
||||
static final int DEFAULT_BLOCK_SIZE = 100;
|
||||
|
||||
static {
|
||||
|
@ -409,4 +414,73 @@ public class TestMover {
|
|||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout = 300000)
|
||||
public void testMoverWhenStoragePolicyUnset() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(1)
|
||||
.storageTypes(
|
||||
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE}})
|
||||
.build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
final DistributedFileSystem dfs = cluster.getFileSystem();
|
||||
final String file = "/testMoverWhenStoragePolicyUnset";
|
||||
// write to DISK
|
||||
DFSTestUtil.createFile(dfs, new Path(file), 1L, (short) 1, 0L);
|
||||
|
||||
// move to ARCHIVE
|
||||
dfs.setStoragePolicy(new Path(file), "COLD");
|
||||
int rc = ToolRunner.run(conf, new Mover.Cli(),
|
||||
new String[] {"-p", file.toString()});
|
||||
Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc);
|
||||
|
||||
// Wait till namenode notified about the block location details
|
||||
waitForLocatedBlockWithArchiveStorageType(dfs, file, 1);
|
||||
|
||||
// verify before unset policy
|
||||
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
||||
Assert.assertTrue(StorageType.ARCHIVE == (lb.getStorageTypes())[0]);
|
||||
|
||||
// unset storage policy
|
||||
dfs.unsetStoragePolicy(new Path(file));
|
||||
rc = ToolRunner.run(conf, new Mover.Cli(),
|
||||
new String[] {"-p", file.toString()});
|
||||
Assert.assertEquals("Movement to DISK should be successful", 0, rc);
|
||||
|
||||
lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
||||
Assert.assertTrue(StorageType.DISK == (lb.getStorageTypes())[0]);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
private void waitForLocatedBlockWithArchiveStorageType(
|
||||
final DistributedFileSystem dfs, final String file,
|
||||
final int expectedArchiveCount) throws Exception {
|
||||
GenericTestUtils.waitFor(new Supplier<Boolean>() {
|
||||
@Override
|
||||
public Boolean get() {
|
||||
LocatedBlock lb = null;
|
||||
try {
|
||||
lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Exception while getting located blocks", e);
|
||||
return false;
|
||||
}
|
||||
int archiveCount = 0;
|
||||
for (StorageType storageType : lb.getStorageTypes()) {
|
||||
if (StorageType.ARCHIVE == storageType) {
|
||||
archiveCount++;
|
||||
}
|
||||
}
|
||||
LOG.info("Archive replica count, expected={} and actual={}",
|
||||
expectedArchiveCount, archiveCount);
|
||||
return expectedArchiveCount == archiveCount;
|
||||
}
|
||||
}, 100, 3000);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue