HDFS-7062. Archival Storage: skip under construction block for migration. Contributed by Jing Zhao.
This commit is contained in:
parent
dba52ce0bb
commit
2689b6ca72
|
@ -321,7 +321,14 @@ public class Mover {
|
||||||
|
|
||||||
final LocatedBlocks locatedBlocks = status.getBlockLocations();
|
final LocatedBlocks locatedBlocks = status.getBlockLocations();
|
||||||
boolean hasRemaining = false;
|
boolean hasRemaining = false;
|
||||||
for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()) {
|
final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
|
||||||
|
List<LocatedBlock> lbs = locatedBlocks.getLocatedBlocks();
|
||||||
|
for(int i = 0; i < lbs.size(); i++) {
|
||||||
|
if (i == lbs.size() - 1 && !lastBlkComplete) {
|
||||||
|
// last block is incomplete, skip it
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
LocatedBlock lb = lbs.get(i);
|
||||||
final StorageTypeDiff diff = new StorageTypeDiff(types,
|
final StorageTypeDiff diff = new StorageTypeDiff(types,
|
||||||
lb.getStorageTypes());
|
lb.getStorageTypes());
|
||||||
if (!diff.removeOverlap()) {
|
if (!diff.removeOverlap()) {
|
||||||
|
@ -472,6 +479,7 @@ public class Mover {
|
||||||
final ExitStatus r = m.run();
|
final ExitStatus r = m.run();
|
||||||
|
|
||||||
if (r == ExitStatus.SUCCESS) {
|
if (r == ExitStatus.SUCCESS) {
|
||||||
|
IOUtils.cleanup(LOG, nnc);
|
||||||
iter.remove();
|
iter.remove();
|
||||||
} else if (r != ExitStatus.IN_PROGRESS) {
|
} else if (r != ExitStatus.IN_PROGRESS) {
|
||||||
// must be an error statue, return
|
// must be an error statue, return
|
||||||
|
|
|
@ -387,8 +387,8 @@ public class DFSAdmin extends FsShell {
|
||||||
"\t[-shutdownDatanode <datanode_host:ipc_port> [upgrade]]\n" +
|
"\t[-shutdownDatanode <datanode_host:ipc_port> [upgrade]]\n" +
|
||||||
"\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" +
|
"\t[-getDatanodeInfo <datanode_host:ipc_port>]\n" +
|
||||||
"\t[-metasave filename]\n" +
|
"\t[-metasave filename]\n" +
|
||||||
"\t[-setStoragePolicy path policyName\n" +
|
"\t[-setStoragePolicy path policyName]\n" +
|
||||||
"\t[-getStoragePolicy path\n" +
|
"\t[-getStoragePolicy path]\n" +
|
||||||
"\t[-help [cmd]]\n";
|
"\t[-help [cmd]]\n";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -30,9 +30,13 @@ import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.BlockLocation;
|
||||||
|
import org.apache.hadoop.fs.FSDataInputStream;
|
||||||
|
import org.apache.hadoop.fs.FSDataOutputStream;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
import org.apache.hadoop.hdfs.BlockStoragePolicy;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DFSOutputStream;
|
||||||
import org.apache.hadoop.hdfs.DFSTestUtil;
|
import org.apache.hadoop.hdfs.DFSTestUtil;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
@ -44,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
|
import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus;
|
||||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.LocatedBlocks;
|
||||||
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
|
import org.apache.hadoop.hdfs.server.balancer.Dispatcher;
|
||||||
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
|
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
|
||||||
|
@ -489,6 +494,78 @@ public class TestStorageMover {
|
||||||
new MigrationTest(clusterScheme, nsScheme).runBasicTest(true);
|
new MigrationTest(clusterScheme, nsScheme).runBasicTest(true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Print a big banner in the test log to make debug easier.
|
||||||
|
*/
|
||||||
|
static void banner(String string) {
|
||||||
|
LOG.info("\n\n\n\n================================================\n" +
|
||||||
|
string + "\n" +
|
||||||
|
"==================================================\n\n");
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Move an open file into archival storage
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testMigrateOpenFileToArchival() throws Exception {
|
||||||
|
LOG.info("testMigrateOpenFileToArchival");
|
||||||
|
final Path fooDir = new Path("/foo");
|
||||||
|
Map<Path, BlockStoragePolicy> policyMap = Maps.newHashMap();
|
||||||
|
policyMap.put(fooDir, COLD);
|
||||||
|
NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(fooDir), null,
|
||||||
|
BLOCK_SIZE, null, policyMap);
|
||||||
|
ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,
|
||||||
|
NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null);
|
||||||
|
MigrationTest test = new MigrationTest(clusterScheme, nsScheme);
|
||||||
|
test.setupCluster();
|
||||||
|
|
||||||
|
// create an open file
|
||||||
|
banner("writing to file /foo/bar");
|
||||||
|
final Path barFile = new Path(fooDir, "bar");
|
||||||
|
DFSTestUtil.createFile(test.dfs, barFile, BLOCK_SIZE, (short) 1, 0L);
|
||||||
|
FSDataOutputStream out = test.dfs.append(barFile);
|
||||||
|
out.writeBytes("hello, ");
|
||||||
|
((DFSOutputStream) out.getWrappedStream()).hsync();
|
||||||
|
|
||||||
|
try {
|
||||||
|
banner("start data migration");
|
||||||
|
test.setStoragePolicy(); // set /foo to COLD
|
||||||
|
test.migrate();
|
||||||
|
|
||||||
|
// make sure the under construction block has not been migrated
|
||||||
|
LocatedBlocks lbs = test.dfs.getClient().getLocatedBlocks(
|
||||||
|
barFile.toString(), BLOCK_SIZE);
|
||||||
|
LOG.info("Locations: " + lbs);
|
||||||
|
List<LocatedBlock> blks = lbs.getLocatedBlocks();
|
||||||
|
Assert.assertEquals(1, blks.size());
|
||||||
|
Assert.assertEquals(1, blks.get(0).getLocations().length);
|
||||||
|
|
||||||
|
banner("finish the migration, continue writing");
|
||||||
|
// make sure the writing can continue
|
||||||
|
out.writeBytes("world!");
|
||||||
|
((DFSOutputStream) out.getWrappedStream()).hsync();
|
||||||
|
IOUtils.cleanup(LOG, out);
|
||||||
|
|
||||||
|
lbs = test.dfs.getClient().getLocatedBlocks(
|
||||||
|
barFile.toString(), BLOCK_SIZE);
|
||||||
|
LOG.info("Locations: " + lbs);
|
||||||
|
blks = lbs.getLocatedBlocks();
|
||||||
|
Assert.assertEquals(1, blks.size());
|
||||||
|
Assert.assertEquals(1, blks.get(0).getLocations().length);
|
||||||
|
|
||||||
|
banner("finish writing, starting reading");
|
||||||
|
// check the content of /foo/bar
|
||||||
|
FSDataInputStream in = test.dfs.open(barFile);
|
||||||
|
byte[] buf = new byte[13];
|
||||||
|
// read from offset 1024
|
||||||
|
in.readFully(BLOCK_SIZE, buf, 0, buf.length);
|
||||||
|
IOUtils.cleanup(LOG, in);
|
||||||
|
Assert.assertEquals("hello, world!", new String(buf));
|
||||||
|
} finally {
|
||||||
|
test.shutdownCluster();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test directories with Hot, Warm and Cold polices.
|
* Test directories with Hot, Warm and Cold polices.
|
||||||
*/
|
*/
|
||||||
|
|
Loading…
Reference in New Issue