HBASE-18646 [Backup] LogRollMasterProcedureManager: make procedure timeout, thread pool size configurable (Vladimir Rodionov)
This commit is contained in:
parent
7550c64ac2
commit
19bb4ef487
|
@ -25,6 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
|
import org.apache.hadoop.hbase.CoordinatedStateManagerFactory;
|
||||||
import org.apache.hadoop.hbase.ServerName;
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
|
import org.apache.hadoop.hbase.backup.BackupRestoreConstants;
|
||||||
|
@ -50,11 +51,17 @@ import org.apache.zookeeper.KeeperException;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class LogRollMasterProcedureManager extends MasterProcedureManager {
|
public class LogRollMasterProcedureManager extends MasterProcedureManager {
|
||||||
|
private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class);
|
||||||
|
|
||||||
public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc";
|
public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc";
|
||||||
public static final String ROLLLOG_PROCEDURE_NAME = "rolllog";
|
public static final String ROLLLOG_PROCEDURE_NAME = "rolllog";
|
||||||
private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class);
|
public static final String BACKUP_WAKE_MILLIS_KEY = "hbase.backup.logroll.wake.millis";
|
||||||
|
public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.logroll.timeout.millis";
|
||||||
|
public static final String BACKUP_POOL_THREAD_NUMBER_KEY = "hbase.backup.logroll.pool.thread.number";
|
||||||
|
|
||||||
|
public static final int BACKUP_WAKE_MILLIS_DEFAULT = 500;
|
||||||
|
public static final int BACKUP_TIMEOUT_MILLIS_DEFAULT = 180000;
|
||||||
|
public static final int BACKUP_POOL_THREAD_NUMBER_DEFAULT = 8;
|
||||||
private MasterServices master;
|
private MasterServices master;
|
||||||
private ProcedureCoordinator coordinator;
|
private ProcedureCoordinator coordinator;
|
||||||
private boolean done;
|
private boolean done;
|
||||||
|
@ -77,16 +84,25 @@ public class LogRollMasterProcedureManager extends MasterProcedureManager {
|
||||||
|
|
||||||
// setup the default procedure coordinator
|
// setup the default procedure coordinator
|
||||||
String name = master.getServerName().toString();
|
String name = master.getServerName().toString();
|
||||||
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1);
|
|
||||||
|
|
||||||
|
// get the configuration for the coordinator
|
||||||
|
Configuration conf = master.getConfiguration();
|
||||||
|
long wakeFrequency = conf.getInt(BACKUP_WAKE_MILLIS_KEY, BACKUP_WAKE_MILLIS_DEFAULT);
|
||||||
|
long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY,BACKUP_TIMEOUT_MILLIS_DEFAULT);
|
||||||
|
int opThreads = conf.getInt(BACKUP_POOL_THREAD_NUMBER_KEY,
|
||||||
|
BACKUP_POOL_THREAD_NUMBER_DEFAULT);
|
||||||
|
|
||||||
|
// setup the default procedure coordinator
|
||||||
|
ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads);
|
||||||
BaseCoordinatedStateManager coordManager =
|
BaseCoordinatedStateManager coordManager =
|
||||||
(BaseCoordinatedStateManager) CoordinatedStateManagerFactory
|
(BaseCoordinatedStateManager) CoordinatedStateManagerFactory
|
||||||
.getCoordinatedStateManager(master.getConfiguration());
|
.getCoordinatedStateManager(master.getConfiguration());
|
||||||
coordManager.initialize(master);
|
coordManager.initialize(master);
|
||||||
|
|
||||||
ProcedureCoordinatorRpcs comms =
|
ProcedureCoordinatorRpcs comms =
|
||||||
coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
|
coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name);
|
||||||
|
this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency);
|
||||||
|
|
||||||
this.coordinator = new ProcedureCoordinator(comms, tpool);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
Loading…
Reference in New Issue