diff --git a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java index 47e428c4c4c..88c345569ae 100644 --- a/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java +++ b/hbase-backup/src/main/java/org/apache/hadoop/hbase/backup/master/LogRollMasterProcedureManager.java @@ -25,6 +25,7 @@ import java.util.concurrent.ThreadPoolExecutor; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.CoordinatedStateManagerFactory; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.backup.BackupRestoreConstants; @@ -50,11 +51,17 @@ import org.apache.zookeeper.KeeperException; */ @InterfaceAudience.Private public class LogRollMasterProcedureManager extends MasterProcedureManager { + private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class); public static final String ROLLLOG_PROCEDURE_SIGNATURE = "rolllog-proc"; public static final String ROLLLOG_PROCEDURE_NAME = "rolllog"; - private static final Log LOG = LogFactory.getLog(LogRollMasterProcedureManager.class); + public static final String BACKUP_WAKE_MILLIS_KEY = "hbase.backup.logroll.wake.millis"; + public static final String BACKUP_TIMEOUT_MILLIS_KEY = "hbase.backup.logroll.timeout.millis"; + public static final String BACKUP_POOL_THREAD_NUMBER_KEY = "hbase.backup.logroll.pool.thread.number"; + public static final int BACKUP_WAKE_MILLIS_DEFAULT = 500; + public static final int BACKUP_TIMEOUT_MILLIS_DEFAULT = 180000; + public static final int BACKUP_POOL_THREAD_NUMBER_DEFAULT = 8; private MasterServices master; private ProcedureCoordinator coordinator; private boolean done; @@ -77,16 +84,25 @@ public class LogRollMasterProcedureManager extends MasterProcedureManager { // setup the default procedure coordinator String name = master.getServerName().toString(); - ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, 1); + + + // get the configuration for the coordinator + Configuration conf = master.getConfiguration(); + long wakeFrequency = conf.getInt(BACKUP_WAKE_MILLIS_KEY, BACKUP_WAKE_MILLIS_DEFAULT); + long timeoutMillis = conf.getLong(BACKUP_TIMEOUT_MILLIS_KEY,BACKUP_TIMEOUT_MILLIS_DEFAULT); + int opThreads = conf.getInt(BACKUP_POOL_THREAD_NUMBER_KEY, + BACKUP_POOL_THREAD_NUMBER_DEFAULT); + + // setup the default procedure coordinator + ThreadPoolExecutor tpool = ProcedureCoordinator.defaultPool(name, opThreads); BaseCoordinatedStateManager coordManager = (BaseCoordinatedStateManager) CoordinatedStateManagerFactory - .getCoordinatedStateManager(master.getConfiguration()); + .getCoordinatedStateManager(master.getConfiguration()); coordManager.initialize(master); - ProcedureCoordinatorRpcs comms = coordManager.getProcedureCoordinatorRpcs(getProcedureSignature(), name); + this.coordinator = new ProcedureCoordinator(comms, tpool, timeoutMillis, wakeFrequency); - this.coordinator = new ProcedureCoordinator(comms, tpool); } @Override @@ -152,4 +168,4 @@ public class LogRollMasterProcedureManager extends MasterProcedureManager { return done; } -} \ No newline at end of file +}