HDDS-366. Update functions impacted by SCM chill mode in StorageContainerLocationProtocol. Contributed by Ajay Kumar.

This commit is contained in:
Xiaoyu Yao 2018-09-18 13:28:04 -07:00
parent 2cf8927683
commit 295cce39ed
12 changed files with 273 additions and 62 deletions

View File

@ -176,6 +176,13 @@ enum ScmOps {
keyBlocksInfoList = 2;
getScmInfo = 3;
deleteBlock = 4;
createReplicationPipeline = 5;
allocateContainer = 6;
getContainer = 7;
getContainerWithPipeline = 8;
listContainer = 9;
deleteContainer = 10;
queryNode = 11;
}
/**

View File

@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.server.Precheck;
/**
* SCM utility class.
*/
public final class ScmUtils {
private ScmUtils() {
}
/**
* Perform all prechecks for given scm operation.
*
* @param operation
* @param preChecks prechecks to be performed
*/
public static void preCheck(ScmOps operation, Precheck... preChecks)
throws SCMException {
for (Precheck preCheck : preChecks) {
preCheck.check(operation);
}
}
}

View File

@ -20,6 +20,7 @@
import org.apache.hadoop.conf.StorageUnit;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.container.Mapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
@ -30,7 +31,6 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.server.ChillModePrecheck;
import org.apache.hadoop.hdds.scm.server.Precheck;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.util.MBeans;
@ -190,7 +190,7 @@ public AllocatedBlock allocateBlock(final long size,
ReplicationType type, ReplicationFactor factor, String owner)
throws IOException {
LOG.trace("Size;{} , type : {}, factor : {} ", size, type, factor);
preCheck(ScmOps.allocateBlock, chillModePrecheck);
ScmUtils.preCheck(ScmOps.allocateBlock, chillModePrecheck);
if (size < 0 || size > containerSize) {
LOG.warn("Invalid block size requested : {}", size);
throw new SCMException("Unsupported block size: " + size,
@ -417,19 +417,6 @@ public SCMBlockDeletingService getSCMBlockDeletingService() {
return this.blockDeletingService;
}
/**
* Perform all prechecks for given operations.
*
* @param operation
* @param preChecks prechecks to be performed
*/
public void preCheck(ScmOps operation, Precheck... preChecks)
throws SCMException {
for (Precheck preCheck : preChecks) {
preCheck.check(operation);
}
}
@Override
public void onMessage(Boolean inChillMode, EventPublisher publisher) {
this.chillModePrecheck.setInChillMode(inChillMode);

View File

@ -274,12 +274,6 @@ public ContainerWithPipeline allocateContainer(
ContainerInfo containerInfo;
ContainerWithPipeline containerWithPipeline;
if (!nodeManager.isOutOfChillMode()) {
throw new SCMException(
"Unable to create container while in chill mode",
SCMException.ResultCodes.CHILL_MODE_EXCEPTION);
}
lock.lock();
try {
containerWithPipeline = containerStateManager.allocateContainer(

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.server;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
@ -28,15 +29,16 @@
* */
public class ChillModePrecheck implements Precheck<ScmOps> {
private boolean inChillMode;
private AtomicBoolean inChillMode = new AtomicBoolean(true);
public static final String PRECHECK_TYPE = "ChillModePrecheck";
public boolean check(ScmOps op) throws SCMException {
if(inChillMode && ChillModeRestrictedOps.isRestrictedInChillMode(op)) {
if (inChillMode.get() && ChillModeRestrictedOps
.isRestrictedInChillMode(op)) {
throw new SCMException("ChillModePrecheck failed for " + op,
ResultCodes.CHILL_MODE_EXCEPTION);
}
return inChillMode;
return inChillMode.get();
}
@Override
@ -45,10 +47,10 @@ public String type() {
}
public boolean isInChillMode() {
return inChillMode;
return inChillMode.get();
}
public void setInChillMode(boolean inChillMode) {
this.inChillMode = inChillMode;
this.inChillMode.set(inChillMode);
}
}

View File

@ -234,6 +234,7 @@ public static class ChillModeRestrictedOps {
static {
restrictedOps.add(ScmOps.allocateBlock);
restrictedOps.add(ScmOps.allocateContainer);
}
public static boolean isRestrictedInChillMode(ScmOps opName) {

View File

@ -27,15 +27,21 @@
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ScmOps;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerLocationProtocolProtos;
import org.apache.hadoop.hdds.scm.HddsServerUtil;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.ScmUtils;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.Pipeline;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
import org.apache.hadoop.hdds.scm.protocolPB.StorageContainerLocationProtocolPB;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.ProtobufRpcEngine;
import org.apache.hadoop.ipc.RPC;
@ -71,13 +77,14 @@
* The RPC server that listens to requests from clients.
*/
public class SCMClientProtocolServer implements
StorageContainerLocationProtocol {
StorageContainerLocationProtocol, EventHandler<Boolean> {
private static final Logger LOG =
LoggerFactory.getLogger(SCMClientProtocolServer.class);
private final RPC.Server clientRpcServer;
private final InetSocketAddress clientRpcAddress;
private final StorageContainerManager scm;
private final OzoneConfiguration conf;
private ChillModePrecheck chillModePrecheck = new ChillModePrecheck();
public SCMClientProtocolServer(OzoneConfiguration conf,
StorageContainerManager scm) throws IOException {
@ -149,6 +156,7 @@ public String getRpcRemoteUsername() {
public ContainerWithPipeline allocateContainer(HddsProtos.ReplicationType
replicationType, HddsProtos.ReplicationFactor factor,
String owner) throws IOException {
ScmUtils.preCheck(ScmOps.allocateContainer, chillModePrecheck);
String remoteUser = getRpcRemoteUsername();
getScm().checkAdminAccess(remoteUser);
@ -167,12 +175,39 @@ public ContainerInfo getContainer(long containerID) throws IOException {
@Override
public ContainerWithPipeline getContainerWithPipeline(long containerID)
throws IOException {
if (chillModePrecheck.isInChillMode()) {
ContainerInfo contInfo = scm.getScmContainerManager()
.getContainer(containerID);
if (contInfo.isContainerOpen()) {
if (!hasRequiredReplicas(contInfo)) {
throw new SCMException("Open container " + containerID + " doesn't"
+ " have enough replicas to service this operation in "
+ "Chill mode.", ResultCodes.CHILL_MODE_EXCEPTION);
}
}
}
String remoteUser = getRpcRemoteUsername();
getScm().checkAdminAccess(remoteUser);
return scm.getScmContainerManager()
.getContainerWithPipeline(containerID);
}
/**
* Check if container reported replicas are equal or greater than required
* replication factor.
*/
private boolean hasRequiredReplicas(ContainerInfo contInfo) {
try{
return getScm().getScmContainerManager().getStateManager()
.getContainerReplicas(contInfo.containerID())
.size() >= contInfo.getReplicationFactor().getNumber();
} catch (SCMException ex) {
// getContainerReplicas throws exception if no replica's exist for given
// container.
return false;
}
}
@Override
public List<ContainerInfo> listContainer(long startContainerID,
int count) throws IOException {
@ -290,6 +325,22 @@ public StorageContainerManager getScm() {
return scm;
}
/**
* Set chill mode status based on SCMEvents.CHILL_MODE_STATUS event.
*/
@Override
public void onMessage(Boolean inChillMOde, EventPublisher publisher) {
chillModePrecheck.setInChillMode(inChillMOde);
}
/**
* Set chill mode status based on .
*/
public boolean getChillModeStatus() {
return chillModePrecheck.isInChillMode();
}
/**
* Query the System for Nodes.
*

View File

@ -65,6 +65,7 @@
import org.apache.hadoop.hdds.scm.pipelines.PipelineCloseHandler;
import org.apache.hadoop.hdds.scm.pipelines.PipelineActionEventHandler;
import org.apache.hadoop.hdds.server.ServiceRuntimeInfoImpl;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdfs.DFSUtil;
import org.apache.hadoop.io.IOUtils;
@ -241,29 +242,6 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
PipelineCloseHandler pipelineCloseHandler =
new PipelineCloseHandler(scmContainerManager);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler);
eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler);
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler);
eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
eventQueue.addHandler(SCMEvents.START_REPLICATION,
replicationStatus.getReplicationStatusListener());
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
replicationStatus.getChillModeStatusListener());
eventQueue
.addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS,
pipelineActionEventHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler);
eventQueue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
scmChillModeManager);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
(BlockManagerImpl) scmBlockManager);
long watcherTimeout =
conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
@ -298,6 +276,31 @@ private StorageContainerManager(OzoneConfiguration conf) throws IOException {
blockProtocolServer = new SCMBlockProtocolServer(conf, this);
clientProtocolServer = new SCMClientProtocolServer(conf, this);
httpServer = new StorageContainerManagerHttpServer(conf);
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, scmNodeManager);
eventQueue.addHandler(SCMEvents.NODE_REPORT, nodeReportHandler);
eventQueue.addHandler(SCMEvents.CONTAINER_REPORT, containerReportHandler);
eventQueue.addHandler(SCMEvents.CONTAINER_ACTIONS, actionsHandler);
eventQueue.addHandler(SCMEvents.CLOSE_CONTAINER, closeContainerHandler);
eventQueue.addHandler(SCMEvents.NEW_NODE, newNodeHandler);
eventQueue.addHandler(SCMEvents.STALE_NODE, staleNodeHandler);
eventQueue.addHandler(SCMEvents.DEAD_NODE, deadNodeHandler);
eventQueue.addHandler(SCMEvents.CMD_STATUS_REPORT, cmdStatusReportHandler);
eventQueue.addHandler(SCMEvents.START_REPLICATION,
replicationStatus.getReplicationStatusListener());
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
replicationStatus.getChillModeStatusListener());
eventQueue
.addHandler(SCMEvents.PENDING_DELETE_STATUS, pendingDeleteHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS,
pipelineActionEventHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_CLOSE, pipelineCloseHandler);
eventQueue.addHandler(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
scmChillModeManager);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS,
(BlockManagerImpl) scmBlockManager);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, clientProtocolServer);
registerMXBean();
}
@ -830,6 +833,13 @@ public boolean isInChillMode() {
return scmChillModeManager.getInChillMode();
}
/**
* Returns EventPublisher.
*/
public EventPublisher getEventQueue(){
return eventQueue;
}
@VisibleForTesting
public double getCurrentContainerThreshold() {
return scmChillModeManager.getCurrentContainerThreshold();

View File

@ -205,14 +205,6 @@ public void testgetNoneExistentContainer() throws IOException {
mapping.getContainer(random.nextLong());
}
@Test
public void testChillModeAllocateContainerFails() throws IOException {
nodeManager.setChillmode(true);
thrown.expectMessage("Unable to create container while in chill mode");
mapping.allocateContainer(xceiverClientManager.getType(),
xceiverClientManager.getFactor(), containerOwner);
}
@Test
public void testContainerCreationLeaseTimeout() throws IOException,
InterruptedException {

View File

@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements.See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership.The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License.You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.server;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
/**
* Test class for @{@link SCMClientProtocolServer}.
* */
public class TestSCMClientProtocolServer {
private SCMClientProtocolServer scmClientProtocolServer;
private OzoneConfiguration config;
private EventQueue eventQueue;
@Before
public void setUp() throws Exception {
config = new OzoneConfiguration();
eventQueue = new EventQueue();
scmClientProtocolServer = new SCMClientProtocolServer(config, null);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, scmClientProtocolServer);
}
@After
public void tearDown() throws Exception {
}
@Test
public void testAllocateContainerFailureInChillMode() throws Exception {
LambdaTestUtils.intercept(SCMException.class,
"hillModePrecheck failed for allocateContainer", () -> {
scmClientProtocolServer.allocateContainer(
ReplicationType.STAND_ALONE, ReplicationFactor.ONE, "");
});
}
}

View File

@ -43,6 +43,8 @@
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleEvent;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@ -53,6 +55,7 @@
import org.apache.hadoop.hdds.scm.container.ContainerMapping;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.server.SCMChillModeManager;
@ -66,6 +69,7 @@
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.LambdaTestUtils;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
@ -551,4 +555,55 @@ public void testSCMChillMode() throws Exception {
cluster.shutdown();
}
@Test
public void testSCMChillModeRestrictedOp() throws Exception {
OzoneConfiguration conf = new OzoneConfiguration();
conf.set(OzoneConfigKeys.OZONE_METADATA_STORE_IMPL,
OzoneConfigKeys.OZONE_METADATA_STORE_IMPL_LEVELDB);
MiniOzoneClusterImpl cluster = (MiniOzoneClusterImpl) MiniOzoneCluster
.newBuilder(conf)
.setHbInterval(1000)
.setHbProcessorInterval(500)
.setStartDataNodes(false)
.build();
StorageContainerManager scm = cluster.getStorageContainerManager();
assertTrue(scm.isInChillMode());
LambdaTestUtils.intercept(SCMException.class,
"ChillModePrecheck failed for allocateContainer", () -> {
scm.getClientProtocolServer()
.allocateContainer(ReplicationType.STAND_ALONE,
ReplicationFactor.ONE, "");
});
cluster.startHddsDatanodes();
cluster.waitForClusterToBeReady();
assertFalse(scm.isInChillMode());
TestStorageContainerManagerHelper helper =
new TestStorageContainerManagerHelper(cluster, conf);
helper.createKeys(10, 4096);
SCMClientProtocolServer clientProtocolServer = cluster
.getStorageContainerManager().getClientProtocolServer();
final List<ContainerInfo> containers = scm.getScmContainerManager()
.getStateManager().getAllContainers();
scm.getEventQueue().fireEvent(SCMEvents.CHILL_MODE_STATUS, true);
assertFalse((scm.getClientProtocolServer()).getChillModeStatus());
GenericTestUtils.waitFor(() -> {
return clientProtocolServer.getChillModeStatus();
}, 50, 1000 * 5);
assertTrue(clientProtocolServer.getChillModeStatus());
LambdaTestUtils.intercept(SCMException.class,
"Open container " + containers.get(0).getContainerID() + " "
+ "doesn't have enough replicas to service this operation in Chill"
+ " mode.", () -> clientProtocolServer
.getContainerWithPipeline(containers.get(0).getContainerID()));
cluster.shutdown();
}
}

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.ozone.scm;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.MiniOzoneCluster;
@ -60,6 +61,8 @@
@RunWith(Parameterized.class)
public class TestContainerSQLCli {
private EventQueue eventQueue;
@Parameterized.Parameters
public static Collection<Object[]> data() {
return Arrays.asList(new Object[][] {
@ -114,12 +117,16 @@ public void setup() throws Exception {
.getDatanodeDetails().getIpAddress();
cluster.getOzoneManager().stop();
cluster.getStorageContainerManager().stop();
eventQueue = new EventQueue();
nodeManager = cluster.getStorageContainerManager().getScmNodeManager();
mapping = new ContainerMapping(conf, nodeManager, 128,
new EventQueue());
blockManager = new BlockManagerImpl(conf, nodeManager, mapping, null);
eventQueue);
blockManager = new BlockManagerImpl(conf, nodeManager, mapping, eventQueue);
eventQueue.addHandler(SCMEvents.CHILL_MODE_STATUS, blockManager);
eventQueue.fireEvent(SCMEvents.CHILL_MODE_STATUS, false);
GenericTestUtils.waitFor(() -> {
return !blockManager.isScmInChillMode();
}, 10, 1000 * 15);
// blockManager.allocateBlock() will create containers if there is none
// stored in levelDB. The number of containers to create is the value of
// OZONE_SCM_CONTAINER_PROVISION_BATCH_SIZE which we set to 2.