diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 872261ee7f7..5bacc7d381c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -440,9 +440,15 @@ public class BlockManager implements BlockStatsMXBean { /** Storages accessible from multiple DNs. */ private final ProvidedStorageMap providedStorageMap; + /** + * Whether HA is enabled. + */ + private final boolean haEnabled; + public BlockManager(final Namesystem namesystem, boolean haEnabled, final Configuration conf) throws IOException { this.namesystem = namesystem; + this.haEnabled = haEnabled; datanodeManager = new DatanodeManager(this, namesystem, conf); heartbeatManager = datanodeManager.getHeartbeatManager(); this.blockIdManager = new BlockIdManager(this); @@ -713,7 +719,7 @@ public class BlockManager implements BlockStatsMXBean { this.blockReportThread.start(); mxBeanName = MBeans.register("NameNode", "BlockStats", this); bmSafeMode.activate(blockTotal); - if (sps != null) { + if (sps != null && !haEnabled) { sps.start(); } } @@ -5046,6 +5052,7 @@ public class BlockManager implements BlockStatsMXBean { LOG.info("Storage policy satisfier is already running."); return; } + sps.start(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index ce78bde3ae9..b4e9716803d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; +import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; @@ -661,9 +662,13 @@ public class Mover { try { spsRunning = nnc.getDistributedFileSystem().getClient() .isStoragePolicySatisfierRunning(); - } catch (StandbyException e) { - System.err.println("Skip Standby Namenode. " + nnc.toString()); - continue; + } catch (RemoteException e) { + IOException cause = e.unwrapRemoteException(); + if (cause instanceof StandbyException) { + System.err.println("Skip Standby Namenode. " + nnc.toString()); + continue; + } + throw e; } if (spsRunning) { System.err.println("Mover failed due to StoragePolicySatisfier" diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 02e47af3846..474a7e30578 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -1290,6 +1290,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir, edekCacheLoaderDelay, edekCacheLoaderInterval); } + + blockManager.activateSPS(); } finally { startingActiveService = false; blockManager.checkSafeMode(); @@ -1319,6 +1321,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, LOG.info("Stopping services started for active state"); writeLock(); try { + if (blockManager != null) { + blockManager.deactivateSPS(); + } + stopSecretManager(); leaseManager.stopMonitor(); if (nnrmthread != null) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index 1c94da4958e..2eb2f04b204 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -2137,6 +2137,13 @@ public class NameNode extends ReconfigurableBase implements + "we must pass true/false only")); } + if (!isActiveState()) { + throw new ReconfigurationException(property, newVal, + getConf().get(property), new HadoopIllegalArgumentException( + "Activating or deactivating storage policy satisfier service on " + + state + " NameNode is not allowed")); + } + boolean activateSPS = Boolean.parseBoolean(newVal); if (activateSPS) { namesystem.getBlockManager().activateSPS(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java new file mode 100644 index 00000000000..4d226ffb9b7 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestStoragePolicySatisfierWithHA.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.namenode; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.conf.ReconfigurationException; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.MiniDFSNNTopology; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.StandbyException; +import org.apache.hadoop.test.GenericTestUtils; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; + +/** + * Tests that StoragePolicySatisfier is able to work with HA enabled. + */ +public class TestStoragePolicySatisfierWithHA { + private MiniDFSCluster cluster = null; + + @Before + public void setUp() throws IOException { + Configuration conf = new Configuration(); + cluster = new MiniDFSCluster.Builder(conf) + .nnTopology(MiniDFSNNTopology.simpleHATopology()) + .numDataNodes(1) + .build(); + } + + /** + * Tests to verify that SPS should run/stop automatically when NN state + * changes between Standby and Active. + */ + @Test(timeout = 100000) + public void testWhenNNHAStateChanges() throws IOException { + try { + DistributedFileSystem fs; + boolean running; + + cluster.waitActive(); + fs = cluster.getFileSystem(0); + + try { + fs.getClient().isStoragePolicySatisfierRunning(); + Assert.fail("Call this function to Standby NN should " + + "raise an exception."); + } catch (RemoteException e) { + IOException cause = e.unwrapRemoteException(); + if (!(cause instanceof StandbyException)) { + Assert.fail("Unexpected exception happened " + e); + } + } + + cluster.transitionToActive(0); + running = fs.getClient().isStoragePolicySatisfierRunning(); + Assert.assertTrue("StoragePolicySatisfier should be active " + + "when NN transits from Standby to Active mode.", running); + + // NN transits from Active to Standby + cluster.transitionToStandby(0); + try { + fs.getClient().isStoragePolicySatisfierRunning(); + Assert.fail("NN in Standby again, call this function should " + + "raise an exception."); + } catch (RemoteException e) { + IOException cause = e.unwrapRemoteException(); + if (!(cause instanceof StandbyException)) { + Assert.fail("Unexpected exception happened " + e); + } + } + + try { + cluster.getNameNode(0).reconfigurePropertyImpl( + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, "false"); + Assert.fail("It's not allowed to activate or deactivate" + + " StoragePolicySatisfier on Standby NameNode"); + } catch (ReconfigurationException e) { + GenericTestUtils.assertExceptionContains("Could not change property " + + DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY + + " from 'true' to 'false'", e); + GenericTestUtils.assertExceptionContains( + "Activating or deactivating storage policy satisfier service on " + + "standby NameNode is not allowed", e.getCause()); + } + } finally { + cluster.shutdown(); + } + } +}