HBASE-21729 Extract ProcedureCoordinatorRpcs and ProcedureMemberRpcs from CoordinatedStateManager

This commit is contained in:
Jingyun Tian 2019-01-24 19:30:52 +08:00 committed by Jingyun Tian
parent e43c833145
commit 91dffb043a
8 changed files with 85 additions and 40 deletions

View File

@ -24,20 +24,20 @@ import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.errorhandling.ForeignException;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.master.MetricsMaster;
import org.apache.hadoop.hbase.procedure.MasterProcedureManager;
import org.apache.hadoop.hbase.procedure.Procedure;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinationManager;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinator;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinationManager;
import org.apache.hadoop.hbase.security.User;
import org.apache.hadoop.hbase.security.access.AccessChecker;
import org.apache.yetus.audience.InterfaceAudience;
@ -98,7 +98,7 @@ public class LogRollMasterProcedureManager extends MasterProcedureManager {
// setup the default procedure coordinator
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
CoordinatedStateManager coordManager = new ZkCoordinatedStateManager(master);
ProcedureCoordinationManager coordManager = new ZKProcedureCoordinationManager(master);
ProcedureCoordinatorRpcs comms =
coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);

View File

@ -22,17 +22,17 @@ import java.io.IOException;
import java.util.concurrent.ThreadPoolExecutor;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
import org.apache.hadoop.hbase.backup.impl.BackupManager;
import org.apache.hadoop.hbase.backup.master.LogRollMasterProcedureManager;
import org.apache.hadoop.hbase.coordination.ZkCoordinatedStateManager;
import org.apache.hadoop.hbase.errorhandling.ForeignExceptionDispatcher;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinationManager;
import org.apache.hadoop.hbase.procedure.ProcedureMember;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.hadoop.hbase.procedure.RegionServerProcedureManager;
import org.apache.hadoop.hbase.procedure.Subprocedure;
import org.apache.hadoop.hbase.procedure.SubprocedureFactory;
import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinationManager;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@ -156,7 +156,7 @@ public class LogRollRegionServerProcedureManager extends RegionServerProcedureMa
+ " setting");
return;
}
CoordinatedStateManager coordManager = new ZkCoordinatedStateManager(rss);
ProcedureCoordinationManager coordManager = new ZKProcedureCoordinationManager(rss);
this.memberRpcs = coordManager
.getProcedureMemberRpcs(LogRollMasterProcedureManager.ROLLLOG_PROCEDURE_SIGNATURE);

View File

@ -19,12 +19,7 @@ package org.apache.hadoop.hbase;
import org.apache.hadoop.hbase.coordination.SplitLogManagerCoordination;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
import java.io.IOException;
/**
* Implementations of this interface will keep and return to clients
@ -47,15 +42,4 @@ public interface CoordinatedStateManager {
* Method to retrieve coordination for split log manager
*/
SplitLogManagerCoordination getSplitLogManagerCoordination();
/**
* Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
*/
ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode)
throws IOException;
/**
* Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs}
*/
ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws KeeperException;
}

View File

@ -17,18 +17,11 @@
*/
package org.apache.hadoop.hbase.coordination;
import java.io.IOException;
import org.apache.hadoop.hbase.CoordinatedStateManager;
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs;
import org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs;
import org.apache.hadoop.hbase.procedure.ZKProcedureCoordinator;
import org.apache.hadoop.hbase.procedure.ZKProcedureMemberRpcs;
import org.apache.zookeeper.KeeperException;
/**
* ZooKeeper-based implementation of {@link org.apache.hadoop.hbase.CoordinatedStateManager}.
@ -55,15 +48,4 @@ public class ZkCoordinatedStateManager implements CoordinatedStateManager {
public SplitLogManagerCoordination getSplitLogManagerCoordination() {
return splitLogManagerCoordination;
}
@Override
public ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode)
throws IOException {
return new ZKProcedureCoordinator(watcher, procType, coordNode);
}
@Override
public ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws KeeperException {
return new ZKProcedureMemberRpcs(watcher, procType);
}
}

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@InterfaceAudience.Private
public interface ProcedureCoordinationManager {
/**
* Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureCoordinatorRpcs}
*/
ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode);
/**
* Method to retrieve {@link org.apache.hadoop.hbase.procedure.ProcedureMemberRpcs}
*/
ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws KeeperException;
}

View File

@ -136,6 +136,7 @@ public class ProcedureCoordinator {
* of IO problem. On errors, the procedure's monitor holds a reference to the exception
* that caused the failure.
*/
@SuppressWarnings("FutureReturnValueIgnored")
boolean submitProcedure(Procedure proc) {
// if the submitted procedure was null, then we don't want to run it
if (proc == null) {

View File

@ -119,6 +119,7 @@ public class ProcedureMember implements Closeable {
* could not be started. In the latter case, the subprocedure holds a reference to
* the exception that caused the failure.
*/
@SuppressWarnings("FutureReturnValueIgnored")
public boolean submitSubprocedure(Subprocedure subproc) {
// if the submitted subprocedure was null, bail.
if (subproc == null) {

View File

@ -0,0 +1,42 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.procedure;
import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.zookeeper.ZKWatcher;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.KeeperException;
@InterfaceAudience.Private
public class ZKProcedureCoordinationManager implements ProcedureCoordinationManager{
ZKWatcher watcher;
public ZKProcedureCoordinationManager(Server server) {
this.watcher = server.getZooKeeper();
}
@Override
public ProcedureCoordinatorRpcs getProcedureCoordinatorRpcs(String procType, String coordNode) {
return new ZKProcedureCoordinator(watcher, procType, coordNode);
}
@Override
public ProcedureMemberRpcs getProcedureMemberRpcs(String procType) throws KeeperException {
return new ZKProcedureMemberRpcs(watcher, procType);
}
}