diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java index 042aca38b96..f15db73bbed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BlockStorageMovementAttemptedItems.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode; import static org.apache.hadoop.util.Time.monotonicNow; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -54,6 +55,7 @@ public class BlockStorageMovementAttemptedItems { private final List storageMovementAttemptedResults; private volatile boolean monitorRunning = true; private Daemon timerThread = null; + private final StoragePolicySatisfier sps; // // It might take anywhere between 30 to 60 minutes before // a request is timed out. @@ -69,7 +71,8 @@ public class BlockStorageMovementAttemptedItems { public BlockStorageMovementAttemptedItems(long recheckTimeout, long selfRetryTimeout, - BlockStorageMovementNeeded unsatisfiedStorageMovementFiles) { + BlockStorageMovementNeeded unsatisfiedStorageMovementFiles, + StoragePolicySatisfier sps) { if (recheckTimeout > 0) { this.minCheckTimeout = Math.min(minCheckTimeout, recheckTimeout); } @@ -78,6 +81,7 @@ public class BlockStorageMovementAttemptedItems { this.blockStorageMovementNeeded = unsatisfiedStorageMovementFiles; storageMovementAttemptedItems = new HashMap<>(); storageMovementAttemptedResults = new ArrayList<>(); + this.sps = sps; } /** @@ -200,6 +204,9 @@ public class BlockStorageMovementAttemptedItems { } catch (InterruptedException ie) { LOG.info("BlocksStorageMovementAttemptResultMonitor thread " + "is interrupted.", ie); + } catch (IOException ie) { + LOG.warn("BlocksStorageMovementAttemptResultMonitor thread " + + "received exception and exiting.", ie); } } } @@ -248,7 +255,7 @@ public class BlockStorageMovementAttemptedItems { } @VisibleForTesting - void blockStorageMovementResultCheck() { + void blockStorageMovementResultCheck() throws IOException { synchronized (storageMovementAttemptedResults) { Iterator resultsIter = storageMovementAttemptedResults.iterator(); @@ -296,6 +303,9 @@ public class BlockStorageMovementAttemptedItems { + " reported from co-ordinating datanode. But the trackID " + "doesn't exists in storageMovementAttemptedItems list", storageMovementAttemptedResult.getTrackId()); + // Remove xattr for the track id. + this.sps.notifyBlkStorageMovementFinished( + storageMovementAttemptedResult.getTrackId()); } } // Remove trackID from the attempted list, if any. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java index 500314b6b4a..0df58bf57d3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirAttrOp.java @@ -534,6 +534,14 @@ public class FSDirAttrOp { return false; } + static void unprotectedRemoveSPSXAttr(INode inode, XAttr spsXAttr) + throws IOException{ + List existingXAttrs = XAttrStorage.readINodeXAttrs(inode); + existingXAttrs.remove(spsXAttr); + XAttrStorage.updateINodeXAttrs(inode, existingXAttrs, + INodesInPath.fromINode(inode).getLatestSnapshotId()); + } + private static void setDirStoragePolicy( FSDirectory fsd, INodesInPath iip, byte policyId) throws IOException { INode inode = FSDirectory.resolveLastINode(iip); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java index 1a061056488..4e4096b652d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSDirectory.java @@ -1418,6 +1418,22 @@ public class FSDirectory implements Closeable { getBlockManager().satisfyStoragePolicy(inode.getId()); } + /** + * Remove the SPS xattr from the inode, retrieve the inode from the + * block collection id. + * @param id + * - file block collection id. + */ + public void removeSPSXattr(long id) throws IOException { + final INode inode = getInode(id); + final XAttrFeature xaf = inode.getXAttrFeature(); + final XAttr spsXAttr = xaf.getXAttr(XATTR_SATISFY_STORAGE_POLICY); + + if (spsXAttr != null) { + FSDirAttrOp.unprotectedRemoveSPSXAttr(inode, spsXAttr); + } + } + private void addEncryptionZone(INodeWithAdditionalFields inode, XAttrFeature xaf) { if (xaf == null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java index 29c8a5db647..337d5b5da50 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/StoragePolicySatisfier.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hdfs.server.namenode; +import java.io.IOException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; @@ -91,7 +92,8 @@ public class StoragePolicySatisfier implements Runnable { conf.getLong( DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_KEY, DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_SELF_RETRY_TIMEOUT_MILLIS_DEFAULT), - storageMovementNeeded); + storageMovementNeeded, + this); } /** @@ -119,12 +121,6 @@ public class StoragePolicySatisfier implements Runnable { */ public synchronized void stop(boolean reconfigStop) { isRunning = false; - if (reconfigStop) { - LOG.info("Stopping StoragePolicySatisfier, as admin requested to " - + "deactivate it."); - } else { - LOG.info("Stopping StoragePolicySatisfier."); - } if (storagePolicySatisfierThread == null) { return; } @@ -135,8 +131,12 @@ public class StoragePolicySatisfier implements Runnable { } this.storageMovementsMonitor.stop(); if (reconfigStop) { - this.clearQueues(); + LOG.info("Stopping StoragePolicySatisfier, as admin requested to " + + "deactivate it."); + this.clearQueuesWithNotification(); this.blockManager.getDatanodeManager().addDropSPSWorkCommandsToAllDNs(); + } else { + LOG.info("Stopping StoragePolicySatisfier."); } } @@ -717,4 +717,33 @@ public class StoragePolicySatisfier implements Runnable { + "user requests on satisfying block storages would be discarded."); storageMovementNeeded.clearAll(); } + + /** + * Clean all the movements in storageMovementNeeded and notify + * to clean up required resources. + * @throws IOException + */ + private void clearQueuesWithNotification() { + Long id; + while ((id = storageMovementNeeded.get()) != null) { + try { + notifyBlkStorageMovementFinished(id); + } catch (IOException ie) { + LOG.warn("Failed to remove SPS " + + "xattr for collection id " + id, ie); + } + } + } + + /** + * When block movement has been finished successfully, some additional + * operations should be notified, for example, SPS xattr should be + * removed. + * @param trackId track id i.e., block collection id. + * @throws IOException + */ + public void notifyBlkStorageMovementFinished(long trackId) + throws IOException { + this.namesystem.getFSDirectory().removeSPSXattr(trackId); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java index afe351810ae..8c4107a2778 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/DFSTestUtil.java @@ -2454,6 +2454,6 @@ public class DFSTestUtil { + expectedStorageCount + " and actual=" + actualStorageCount); return expectedStorageCount == actualStorageCount; } - }, 1000, timeout); + }, 500, timeout); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java index 66411343167..95142d356bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestBlockStorageMovementAttemptedItems.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksStorageMovementResult; import org.junit.After; import org.junit.Before; import org.junit.Test; +import org.mockito.Mockito; /** * Tests that block storage movement attempt failures are reported from DN and @@ -36,10 +37,11 @@ public class TestBlockStorageMovementAttemptedItems { private final int selfRetryTimeout = 500; @Before - public void setup() { + public void setup() throws Exception { unsatisfiedStorageMovementFiles = new BlockStorageMovementNeeded(); + StoragePolicySatisfier sps = Mockito.mock(StoragePolicySatisfier.class); bsmAttemptedItems = new BlockStorageMovementAttemptedItems(100, - selfRetryTimeout, unsatisfiedStorageMovementFiles); + selfRetryTimeout, unsatisfiedStorageMovementFiles, sps); } @After diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java index e4b4290d92f..8c3359a9d34 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestPersistentStoragePolicySatisfier.java @@ -20,16 +20,22 @@ package org.apache.hadoop.hdfs.server.namenode; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.StorageType; +import org.apache.hadoop.fs.XAttr; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.hdfs.XAttrHelper; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.junit.Test; import java.io.IOException; +import java.util.List; + +import static org.apache.hadoop.hdfs.server.common.HdfsServerConstants.XATTR_SATISFY_STORAGE_POLICY; +import static org.junit.Assert.assertFalse; /** * Test persistence of satisfying files/directories. @@ -72,7 +78,16 @@ public class TestPersistentStoragePolicySatisfier { * @throws IOException */ public void clusterSetUp() throws Exception { - clusterSetUp(false); + clusterSetUp(false, new HdfsConfiguration()); + } + + /** + * Setup environment for every test case. + * @param hdfsConf hdfs conf. + * @throws Exception + */ + public void clusterSetUp(Configuration hdfsConf) throws Exception { + clusterSetUp(false, hdfsConf); } /** @@ -80,8 +95,9 @@ public class TestPersistentStoragePolicySatisfier { * @param isHAEnabled if true, enable simple HA. * @throws IOException */ - private void clusterSetUp(boolean isHAEnabled) throws Exception { - conf = new HdfsConfiguration(); + private void clusterSetUp(boolean isHAEnabled, Configuration newConf) + throws Exception { + conf = newConf; final int dnNumber = storageTypes.length; final short replication = 3; MiniDFSCluster.Builder clusterBuilder = new MiniDFSCluster.Builder(conf) @@ -188,7 +204,7 @@ public class TestPersistentStoragePolicySatisfier { public void testWithHA() throws Exception { try { // Enable HA env for testing. - clusterSetUp(true); + clusterSetUp(true, new HdfsConfiguration()); fs.setStoragePolicy(testFile, ALL_SSD); fs.satisfyStoragePolicy(testFile); @@ -297,6 +313,94 @@ public class TestPersistentStoragePolicySatisfier { } } + /** + * Tests to verify SPS xattr will be removed if the satisfy work has + * been finished, expect that the method satisfyStoragePolicy can be + * invoked on the same file again after the block movement has been + * finished: + * 1. satisfy storage policy of file1. + * 2. wait until storage policy is satisfied. + * 3. satisfy storage policy of file1 again + * 4. make sure step 3 works as expected. + * @throws Exception + */ + @Test(timeout = 300000) + public void testMultipleSatisfyStoragePolicy() throws Exception { + try { + // Lower block movement check for testing. + conf = new HdfsConfiguration(); + final long minCheckTimeout = 500; // minimum value + conf.setLong( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_RECHECK_TIMEOUT_MILLIS_KEY, + minCheckTimeout); + clusterSetUp(conf); + fs.setStoragePolicy(testFile, ONE_SSD); + fs.satisfyStoragePolicy(testFile); + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.SSD, 1, timeout, fs); + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.DISK, 2, timeout, fs); + + // Make sure that SPS xattr has been removed. + int retryTime = 0; + while (retryTime < 30) { + if (!fileContainsSPSXAttr(testFile)) { + break; + } + Thread.sleep(minCheckTimeout); + retryTime += 1; + } + + fs.setStoragePolicy(testFile, COLD); + fs.satisfyStoragePolicy(testFile); + DFSTestUtil.waitExpectedStorageType( + testFileName, StorageType.ARCHIVE, 3, timeout, fs); + } finally { + clusterShutdown(); + } + } + + /** + * Tests to verify SPS xattr is removed after SPS is dropped, + * expect that if the SPS is disabled/dropped, the SPS + * xattr should be removed accordingly: + * 1. satisfy storage policy of file1. + * 2. drop SPS thread in block manager. + * 3. make sure sps xattr is removed. + * @throws Exception + */ + @Test(timeout = 300000) + public void testDropSPS() throws Exception { + try { + clusterSetUp(); + fs.setStoragePolicy(testFile, ONE_SSD); + fs.satisfyStoragePolicy(testFile); + + cluster.getNamesystem().getBlockManager().deactivateSPS(); + + // Make sure satisfy xattr has been removed. + assertFalse(fileContainsSPSXAttr(testFile)); + + } finally { + clusterShutdown(); + } + } + + /** + * Check whether file contains SPS xattr. + * @param fileName file name. + * @return true if file contains SPS xattr. + * @throws IOException + */ + private boolean fileContainsSPSXAttr(Path fileName) throws IOException { + final INode inode = cluster.getNamesystem() + .getFSDirectory().getINode(fileName.toString()); + final XAttr satisfyXAttr = + XAttrHelper.buildXAttr(XATTR_SATISFY_STORAGE_POLICY); + List existingXAttrs = XAttrStorage.readINodeXAttrs(inode); + return existingXAttrs.contains(satisfyXAttr); + } + /** * Restart the hole env and trigger the DataNode's heart beats. * @throws Exception