HDFS-11186. [SPS]: Daemon thread of SPS should start only in Active NN. Contributed by Wei Zhou
This commit is contained in:
parent
6215e35bb6
commit
681d2804c9
|
@ -440,9 +440,15 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
/** Storages accessible from multiple DNs. */
|
/** Storages accessible from multiple DNs. */
|
||||||
private final ProvidedStorageMap providedStorageMap;
|
private final ProvidedStorageMap providedStorageMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Whether HA is enabled.
|
||||||
|
*/
|
||||||
|
private final boolean haEnabled;
|
||||||
|
|
||||||
public BlockManager(final Namesystem namesystem, boolean haEnabled,
|
public BlockManager(final Namesystem namesystem, boolean haEnabled,
|
||||||
final Configuration conf) throws IOException {
|
final Configuration conf) throws IOException {
|
||||||
this.namesystem = namesystem;
|
this.namesystem = namesystem;
|
||||||
|
this.haEnabled = haEnabled;
|
||||||
datanodeManager = new DatanodeManager(this, namesystem, conf);
|
datanodeManager = new DatanodeManager(this, namesystem, conf);
|
||||||
heartbeatManager = datanodeManager.getHeartbeatManager();
|
heartbeatManager = datanodeManager.getHeartbeatManager();
|
||||||
this.blockIdManager = new BlockIdManager(this);
|
this.blockIdManager = new BlockIdManager(this);
|
||||||
|
@ -713,7 +719,7 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
this.blockReportThread.start();
|
this.blockReportThread.start();
|
||||||
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
|
mxBeanName = MBeans.register("NameNode", "BlockStats", this);
|
||||||
bmSafeMode.activate(blockTotal);
|
bmSafeMode.activate(blockTotal);
|
||||||
if (sps != null) {
|
if (sps != null && !haEnabled) {
|
||||||
sps.start();
|
sps.start();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -5046,6 +5052,7 @@ public class BlockManager implements BlockStatsMXBean {
|
||||||
LOG.info("Storage policy satisfier is already running.");
|
LOG.info("Storage policy satisfier is already running.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
sps.start();
|
sps.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -48,6 +48,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorageReport;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
|
||||||
import org.apache.hadoop.io.IOUtils;
|
import org.apache.hadoop.io.IOUtils;
|
||||||
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.net.NetworkTopology;
|
import org.apache.hadoop.net.NetworkTopology;
|
||||||
|
@ -661,9 +662,13 @@ public class Mover {
|
||||||
try {
|
try {
|
||||||
spsRunning = nnc.getDistributedFileSystem().getClient()
|
spsRunning = nnc.getDistributedFileSystem().getClient()
|
||||||
.isStoragePolicySatisfierRunning();
|
.isStoragePolicySatisfierRunning();
|
||||||
} catch (StandbyException e) {
|
} catch (RemoteException e) {
|
||||||
System.err.println("Skip Standby Namenode. " + nnc.toString());
|
IOException cause = e.unwrapRemoteException();
|
||||||
continue;
|
if (cause instanceof StandbyException) {
|
||||||
|
System.err.println("Skip Standby Namenode. " + nnc.toString());
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
throw e;
|
||||||
}
|
}
|
||||||
if (spsRunning) {
|
if (spsRunning) {
|
||||||
System.err.println("Mover failed due to StoragePolicySatisfier"
|
System.err.println("Mover failed due to StoragePolicySatisfier"
|
||||||
|
|
|
@ -1290,6 +1290,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir,
|
FSDirEncryptionZoneOp.warmUpEdekCache(edekCacheLoader, dir,
|
||||||
edekCacheLoaderDelay, edekCacheLoaderInterval);
|
edekCacheLoaderDelay, edekCacheLoaderInterval);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
blockManager.activateSPS();
|
||||||
} finally {
|
} finally {
|
||||||
startingActiveService = false;
|
startingActiveService = false;
|
||||||
blockManager.checkSafeMode();
|
blockManager.checkSafeMode();
|
||||||
|
@ -1319,6 +1321,10 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
LOG.info("Stopping services started for active state");
|
LOG.info("Stopping services started for active state");
|
||||||
writeLock();
|
writeLock();
|
||||||
try {
|
try {
|
||||||
|
if (blockManager != null) {
|
||||||
|
blockManager.deactivateSPS();
|
||||||
|
}
|
||||||
|
|
||||||
stopSecretManager();
|
stopSecretManager();
|
||||||
leaseManager.stopMonitor();
|
leaseManager.stopMonitor();
|
||||||
if (nnrmthread != null) {
|
if (nnrmthread != null) {
|
||||||
|
|
|
@ -2137,6 +2137,13 @@ public class NameNode extends ReconfigurableBase implements
|
||||||
+ "we must pass true/false only"));
|
+ "we must pass true/false only"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (!isActiveState()) {
|
||||||
|
throw new ReconfigurationException(property, newVal,
|
||||||
|
getConf().get(property), new HadoopIllegalArgumentException(
|
||||||
|
"Activating or deactivating storage policy satisfier service on "
|
||||||
|
+ state + " NameNode is not allowed"));
|
||||||
|
}
|
||||||
|
|
||||||
boolean activateSPS = Boolean.parseBoolean(newVal);
|
boolean activateSPS = Boolean.parseBoolean(newVal);
|
||||||
if (activateSPS) {
|
if (activateSPS) {
|
||||||
namesystem.getBlockManager().activateSPS();
|
namesystem.getBlockManager().activateSPS();
|
||||||
|
|
|
@ -0,0 +1,109 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.conf.ReconfigurationException;
|
||||||
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.MiniDFSNNTopology;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests that StoragePolicySatisfier is able to work with HA enabled.
|
||||||
|
*/
|
||||||
|
public class TestStoragePolicySatisfierWithHA {
|
||||||
|
private MiniDFSCluster cluster = null;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() throws IOException {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.nnTopology(MiniDFSNNTopology.simpleHATopology())
|
||||||
|
.numDataNodes(1)
|
||||||
|
.build();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests to verify that SPS should run/stop automatically when NN state
|
||||||
|
* changes between Standby and Active.
|
||||||
|
*/
|
||||||
|
@Test(timeout = 100000)
|
||||||
|
public void testWhenNNHAStateChanges() throws IOException {
|
||||||
|
try {
|
||||||
|
DistributedFileSystem fs;
|
||||||
|
boolean running;
|
||||||
|
|
||||||
|
cluster.waitActive();
|
||||||
|
fs = cluster.getFileSystem(0);
|
||||||
|
|
||||||
|
try {
|
||||||
|
fs.getClient().isStoragePolicySatisfierRunning();
|
||||||
|
Assert.fail("Call this function to Standby NN should "
|
||||||
|
+ "raise an exception.");
|
||||||
|
} catch (RemoteException e) {
|
||||||
|
IOException cause = e.unwrapRemoteException();
|
||||||
|
if (!(cause instanceof StandbyException)) {
|
||||||
|
Assert.fail("Unexpected exception happened " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
cluster.transitionToActive(0);
|
||||||
|
running = fs.getClient().isStoragePolicySatisfierRunning();
|
||||||
|
Assert.assertTrue("StoragePolicySatisfier should be active "
|
||||||
|
+ "when NN transits from Standby to Active mode.", running);
|
||||||
|
|
||||||
|
// NN transits from Active to Standby
|
||||||
|
cluster.transitionToStandby(0);
|
||||||
|
try {
|
||||||
|
fs.getClient().isStoragePolicySatisfierRunning();
|
||||||
|
Assert.fail("NN in Standby again, call this function should "
|
||||||
|
+ "raise an exception.");
|
||||||
|
} catch (RemoteException e) {
|
||||||
|
IOException cause = e.unwrapRemoteException();
|
||||||
|
if (!(cause instanceof StandbyException)) {
|
||||||
|
Assert.fail("Unexpected exception happened " + e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
cluster.getNameNode(0).reconfigurePropertyImpl(
|
||||||
|
DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY, "false");
|
||||||
|
Assert.fail("It's not allowed to activate or deactivate"
|
||||||
|
+ " StoragePolicySatisfier on Standby NameNode");
|
||||||
|
} catch (ReconfigurationException e) {
|
||||||
|
GenericTestUtils.assertExceptionContains("Could not change property "
|
||||||
|
+ DFSConfigKeys.DFS_STORAGE_POLICY_SATISFIER_ACTIVATE_KEY
|
||||||
|
+ " from 'true' to 'false'", e);
|
||||||
|
GenericTestUtils.assertExceptionContains(
|
||||||
|
"Activating or deactivating storage policy satisfier service on "
|
||||||
|
+ "standby NameNode is not allowed", e.getCause());
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue