HDDS-1488. Scm cli command to start/stop replication manager.

Signed-off-by: Anu Engineer <aengineer@apache.org>
This commit is contained in:
Nanda kumar 2019-08-03 19:01:29 +05:30 committed by Anu Engineer
parent 689a80d3ce
commit 69b74e9016
15 changed files with 458 additions and 17 deletions

View File

@ -459,4 +459,21 @@ public boolean inSafeMode() throws IOException {
public boolean forceExitSafeMode() throws IOException {
return storageContainerLocationClient.forceExitSafeMode();
}
@Override
public void startReplicationManager() throws IOException {
storageContainerLocationClient.startReplicationManager();
}
@Override
public void stopReplicationManager() throws IOException {
storageContainerLocationClient.stopReplicationManager();
}
@Override
public boolean getReplicationManagerStatus() throws IOException {
return storageContainerLocationClient.getReplicationManagerStatus();
}
}

View File

@ -203,4 +203,23 @@ Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
* @throws IOException
*/
boolean forceExitSafeMode() throws IOException;
/**
* Start ReplicationManager.
*/
void startReplicationManager() throws IOException;
/**
* Stop ReplicationManager.
*/
void stopReplicationManager() throws IOException;
/**
* Returns ReplicationManager status.
*
* @return True if ReplicationManager is running, false otherwise.
*/
boolean getReplicationManagerStatus() throws IOException;
}

View File

@ -177,4 +177,22 @@ Pipeline createReplicationPipeline(HddsProtos.ReplicationType type,
* @throws IOException
*/
boolean forceExitSafeMode() throws IOException;
/**
* Start ReplicationManager.
*/
void startReplicationManager() throws IOException;
/**
* Stop ReplicationManager.
*/
void stopReplicationManager() throws IOException;
/**
* Returns ReplicationManager status.
*
* @return True if ReplicationManager is running, false otherwise.
*/
boolean getReplicationManagerStatus() throws IOException;
}

View File

@ -29,6 +29,10 @@
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.InSafeModeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@ -407,6 +411,41 @@ public boolean forceExitSafeMode() throws IOException {
}
}
@Override
public void startReplicationManager() throws IOException {
try {
StartReplicationManagerRequestProto request =
StartReplicationManagerRequestProto.getDefaultInstance();
rpcProxy.startReplicationManager(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void stopReplicationManager() throws IOException {
try {
StopReplicationManagerRequestProto request =
StopReplicationManagerRequestProto.getDefaultInstance();
rpcProxy.stopReplicationManager(NULL_RPC_CONTROLLER, request);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public boolean getReplicationManagerStatus() throws IOException {
try {
ReplicationManagerStatusRequestProto request =
ReplicationManagerStatusRequestProto.getDefaultInstance();
ReplicationManagerStatusResponseProto response =
rpcProxy.getReplicationManagerStatus(NULL_RPC_CONTROLLER, request);
return response.getIsRunning();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public Object getUnderlyingProxyObject() {
return rpcProxy;

View File

@ -36,7 +36,10 @@ public enum SCMAction implements AuditAction {
DELETE_CONTAINER,
IN_SAFE_MODE,
FORCE_EXIT_SAFE_MODE,
SORT_DATANODE;
SORT_DATANODE,
START_REPLICATION_MANAGER,
STOP_REPLICATION_MANAGER,
GET_REPLICATION_MANAGER_STATUS;
@Override
public String getAction() {

View File

@ -33,6 +33,12 @@
.StorageContainerLocationProtocolProtos.ForceExitSafeModeResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.GetContainerWithPipelineResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StartReplicationManagerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.StopReplicationManagerResponseProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusRequestProto;
import org.apache.hadoop.hdds.protocol.proto.StorageContainerLocationProtocolProtos.ReplicationManagerStatusResponseProto;
import org.apache.hadoop.hdds.scm.ScmInfo;
import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
@ -306,4 +312,44 @@ public ForceExitSafeModeResponseProto forceExitSafeMode(
throw new ServiceException(ex);
}
}
@Override
public StartReplicationManagerResponseProto startReplicationManager(
RpcController controller, StartReplicationManagerRequestProto request)
throws ServiceException {
try (Scope ignored = TracingUtil.importAndCreateScope(
"startReplicationManager", request.getTraceID())) {
impl.startReplicationManager();
return StartReplicationManagerResponseProto.newBuilder().build();
} catch (IOException ex) {
throw new ServiceException(ex);
}
}
@Override
public StopReplicationManagerResponseProto stopReplicationManager(
RpcController controller, StopReplicationManagerRequestProto request)
throws ServiceException {
try (Scope ignored = TracingUtil.importAndCreateScope(
"stopReplicationManager", request.getTraceID())) {
impl.stopReplicationManager();
return StopReplicationManagerResponseProto.newBuilder().build();
} catch (IOException ex) {
throw new ServiceException(ex);
}
}
@Override
public ReplicationManagerStatusResponseProto getReplicationManagerStatus(
RpcController controller, ReplicationManagerStatusRequestProto request)
throws ServiceException {
try (Scope ignored = TracingUtil.importAndCreateScope(
"getReplicationManagerStatus", request.getTraceID())) {
return ReplicationManagerStatusResponseProto.newBuilder()
.setIsRunning(impl.getReplicationManagerStatus()).build();
} catch (IOException ex) {
throw new ServiceException(ex);
}
}
}

View File

@ -192,6 +192,28 @@ message ForceExitSafeModeResponseProto {
required bool exitedSafeMode = 1;
}
message StartReplicationManagerRequestProto {
optional string traceID = 1;
}
message StartReplicationManagerResponseProto {
}
message StopReplicationManagerRequestProto {
optional string traceID = 1;
}
message StopReplicationManagerResponseProto {
}
message ReplicationManagerStatusRequestProto {
optional string traceID = 1;
}
message ReplicationManagerStatusResponseProto {
required bool isRunning = 1;
}
/**
* Protocol used from an HDFS node to StorageContainerManager. See the request
* and response messages for details of the RPC calls.
@ -275,4 +297,13 @@ service StorageContainerLocationProtocolService {
*/
rpc forceExitSafeMode(ForceExitSafeModeRequestProto)
returns (ForceExitSafeModeResponseProto);
rpc startReplicationManager(StartReplicationManagerRequestProto)
returns (StartReplicationManagerResponseProto);
rpc stopReplicationManager(StopReplicationManagerRequestProto)
returns (StopReplicationManagerResponseProto);
rpc getReplicationManagerStatus(ReplicationManagerStatusRequestProto)
returns (ReplicationManagerStatusResponseProto);
}

View File

@ -104,17 +104,17 @@ public class ReplicationManager {
*/
private final Map<ContainerID, List<InflightAction>> inflightDeletion;
/**
* ReplicationMonitor thread is the one which wakes up at configured
* interval and processes all the containers.
*/
private final Thread replicationMonitor;
/**
* ReplicationManager specific configuration.
*/
private final ReplicationManagerConfiguration conf;
/**
* ReplicationMonitor thread is the one which wakes up at configured
* interval and processes all the containers.
*/
private Thread replicationMonitor;
/**
* Flag used for checking if the ReplicationMonitor thread is running or
* not.
@ -132,28 +132,28 @@ public class ReplicationManager {
public ReplicationManager(final ReplicationManagerConfiguration conf,
final ContainerManager containerManager,
final ContainerPlacementPolicy containerPlacement,
final EventPublisher eventPublisher,
final LockManager lockManager) {
final EventPublisher eventPublisher,
final LockManager<ContainerID> lockManager) {
this.containerManager = containerManager;
this.containerPlacement = containerPlacement;
this.eventPublisher = eventPublisher;
this.lockManager = lockManager;
this.inflightReplication = new HashMap<>();
this.inflightDeletion = new HashMap<>();
this.replicationMonitor = new Thread(this::run);
this.replicationMonitor.setName("ReplicationMonitor");
this.replicationMonitor.setDaemon(true);
this.conf = conf;
this.running = false;
this.inflightReplication = new HashMap<>();
this.inflightDeletion = new HashMap<>();
}
/**
* Starts Replication Monitor thread.
*/
public synchronized void start() {
if (!running) {
if (!isRunning()) {
LOG.info("Starting Replication Monitor Thread.");
running = true;
replicationMonitor = new Thread(this::run);
replicationMonitor.setName("ReplicationMonitor");
replicationMonitor.setDaemon(true);
replicationMonitor.start();
} else {
LOG.info("Replication Monitor Thread is already running.");
@ -166,7 +166,13 @@ public synchronized void start() {
* @return true if running, false otherwise
*/
public boolean isRunning() {
return replicationMonitor.isAlive();
if (!running) {
synchronized (this) {
return replicationMonitor != null
&& replicationMonitor.isAlive();
}
}
return true;
}
/**
@ -185,6 +191,8 @@ synchronized void processContainersNow() {
public synchronized void stop() {
if (running) {
LOG.info("Stopping Replication Monitor Thread.");
inflightReplication.clear();
inflightDeletion.clear();
running = false;
notify();
} else {

View File

@ -469,6 +469,27 @@ public boolean forceExitSafeMode() throws IOException {
return scm.exitSafeMode();
}
@Override
public void startReplicationManager() {
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
SCMAction.START_REPLICATION_MANAGER, null));
scm.getReplicationManager().start();
}
@Override
public void stopReplicationManager() {
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
SCMAction.STOP_REPLICATION_MANAGER, null));
scm.getReplicationManager().stop();
}
@Override
public boolean getReplicationManagerStatus() {
AUDIT.logWriteSuccess(buildAuditMessageForSuccess(
SCMAction.GET_REPLICATION_MANAGER_STATUS, null));
return scm.getReplicationManager().isRunning();
}
/**
* Queries a list of Node that match a set of statuses.
*

View File

@ -116,6 +116,22 @@ public void setup() throws IOException, InterruptedException {
Thread.sleep(100L);
}
/**
* Checks if restarting of replication manager works.
*/
@Test
public void testReplicationManagerRestart() throws InterruptedException {
Assert.assertTrue(replicationManager.isRunning());
replicationManager.stop();
// Stop is a non-blocking call, it might take sometime for the
// ReplicationManager to shutdown
Thread.sleep(500);
Assert.assertFalse(replicationManager.isRunning());
replicationManager.start();
Assert.assertTrue(replicationManager.isRunning());
}
/**
* Open containers are not handled by ReplicationManager.
* This test-case makes sure that ReplicationManages doesn't take

View File

@ -0,0 +1,54 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.cli;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.cli.MissingSubcommandException;
import picocli.CommandLine.Command;
import picocli.CommandLine.ParentCommand;
import java.util.concurrent.Callable;
/**
* Subcommand to group replication manager related operations.
*/
@Command(
name = "replicationmanager",
description = "ReplicationManager specific operations",
mixinStandardHelpOptions = true,
versionProvider = HddsVersionProvider.class,
subcommands = {
ReplicationManagerStartSubcommand.class,
ReplicationManagerStopSubcommand.class,
ReplicationManagerStatusSubcommand.class
})
public class ReplicationManagerCommands implements Callable<Void> {
@ParentCommand
private SCMCLI parent;
public SCMCLI getParent() {
return parent;
}
@Override
public Void call() throws Exception {
throw new MissingSubcommandException(
this.parent.getCmd().getSubcommands().get("replicationmanager"));
}
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.cli;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine.Command;
import picocli.CommandLine.ParentCommand;
import java.util.concurrent.Callable;
/**
* This is the handler that process safe mode check command.
*/
@Command(
name = "start",
description = "Start ReplicationManager",
mixinStandardHelpOptions = true,
versionProvider = HddsVersionProvider.class)
public class ReplicationManagerStartSubcommand implements Callable<Void> {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationManagerStartSubcommand.class);
@ParentCommand
private ReplicationManagerCommands parent;
@Override
public Void call() throws Exception {
try (ScmClient scmClient = parent.getParent().createScmClient()) {
scmClient.startReplicationManager();
LOG.info("Starting ReplicationManager...");
return null;
}
}
}

View File

@ -0,0 +1,60 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.cli;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine.Command;
import picocli.CommandLine.ParentCommand;
import java.util.concurrent.Callable;
/**
* This is the handler that process safe mode check command.
*/
@Command(
name = "status",
description = "Check if ReplicationManager is running or not",
mixinStandardHelpOptions = true,
versionProvider = HddsVersionProvider.class)
public class ReplicationManagerStatusSubcommand implements Callable<Void> {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationManagerStatusSubcommand.class);
@ParentCommand
private ReplicationManagerCommands parent;
@Override
public Void call() throws Exception {
try (ScmClient scmClient = parent.getParent().createScmClient()) {
boolean execReturn = scmClient.getReplicationManagerStatus();
// Output data list
if(execReturn){
LOG.info("ReplicationManager is Running.");
} else {
LOG.info("ReplicationManager is Not Running.");
}
return null;
}
}
}

View File

@ -0,0 +1,55 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdds.scm.cli;
import org.apache.hadoop.hdds.cli.HddsVersionProvider;
import org.apache.hadoop.hdds.scm.client.ScmClient;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import picocli.CommandLine.Command;
import picocli.CommandLine.ParentCommand;
import java.util.concurrent.Callable;
/**
* This is the handler that process safe mode check command.
*/
@Command(
name = "stop",
description = "Stop ReplicationManager",
mixinStandardHelpOptions = true,
versionProvider = HddsVersionProvider.class)
public class ReplicationManagerStopSubcommand implements Callable<Void> {
private static final Logger LOG =
LoggerFactory.getLogger(ReplicationManagerStopSubcommand.class);
@ParentCommand
private ReplicationManagerCommands parent;
@Override
public Void call() throws Exception {
try (ScmClient scmClient = parent.getParent().createScmClient()) {
scmClient.stopReplicationManager();
LOG.info("Stopping ReplicationManager...");
LOG.info("Requested SCM to stop ReplicationManager, " +
"it might take sometime for the ReplicationManager to stop.");
return null;
}
}
}

View File

@ -85,7 +85,8 @@
CloseSubcommand.class,
ListPipelinesSubcommand.class,
ClosePipelineSubcommand.class,
TopologySubcommand.class
TopologySubcommand.class,
ReplicationManagerCommands.class
},
mixinStandardHelpOptions = true)
public class SCMCLI extends GenericCli {