HBASE-22810 Initialize an separate ThreadPoolExecutor for taking/restoring snapshot (#486)
This commit is contained in:
parent
d06f877233
commit
f49a86ae4f
|
@ -1439,6 +1439,33 @@ public final class HConstants {
|
||||||
"hbase.util.default.lossycounting.errorrate";
|
"hbase.util.default.lossycounting.errorrate";
|
||||||
public static final String NOT_IMPLEMENTED = "Not implemented";
|
public static final String NOT_IMPLEMENTED = "Not implemented";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Configurations for master executor services.
|
||||||
|
*/
|
||||||
|
public static final String MASTER_OPEN_REGION_THREADS =
|
||||||
|
"hbase.master.executor.openregion.threads";
|
||||||
|
public static final int MASTER_OPEN_REGION_THREADS_DEFAULT = 5;
|
||||||
|
|
||||||
|
public static final String MASTER_CLOSE_REGION_THREADS =
|
||||||
|
"hbase.master.executor.closeregion.threads";
|
||||||
|
public static final int MASTER_CLOSE_REGION_THREADS_DEFAULT = 5;
|
||||||
|
|
||||||
|
public static final String MASTER_SERVER_OPERATIONS_THREADS =
|
||||||
|
"hbase.master.executor.serverops.threads";
|
||||||
|
public static final int MASTER_SERVER_OPERATIONS_THREADS_DEFAULT = 5;
|
||||||
|
|
||||||
|
public static final String MASTER_META_SERVER_OPERATIONS_THREADS =
|
||||||
|
"hbase.master.executor.meta.serverops.threads";
|
||||||
|
public static final int MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT = 5;
|
||||||
|
|
||||||
|
public static final String MASTER_LOG_REPLAY_OPS_THREADS =
|
||||||
|
"hbase.master.executor.logreplayops.threads";
|
||||||
|
public static final int MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT = 10;
|
||||||
|
|
||||||
|
public static final String MASTER_SNAPSHOT_OPERATIONS_THREADS =
|
||||||
|
"hbase.master.executor.snapshot.threads";
|
||||||
|
public static final int MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT = 3;
|
||||||
|
|
||||||
private HConstants() {
|
private HConstants() {
|
||||||
// Can't be instantiated with this ctor.
|
// Can't be instantiated with this ctor.
|
||||||
}
|
}
|
||||||
|
|
|
@ -200,13 +200,13 @@ public enum EventType {
|
||||||
* C_M_SNAPSHOT_TABLE<br>
|
* C_M_SNAPSHOT_TABLE<br>
|
||||||
* Client asking Master to snapshot an offline table.
|
* Client asking Master to snapshot an offline table.
|
||||||
*/
|
*/
|
||||||
C_M_SNAPSHOT_TABLE (48, ExecutorType.MASTER_TABLE_OPERATIONS),
|
C_M_SNAPSHOT_TABLE (48, ExecutorType.MASTER_SNAPSHOT_OPERATIONS),
|
||||||
/**
|
/**
|
||||||
* Messages originating from Client to Master.<br>
|
* Messages originating from Client to Master.<br>
|
||||||
* C_M_RESTORE_SNAPSHOT<br>
|
* C_M_RESTORE_SNAPSHOT<br>
|
||||||
* Client asking Master to restore a snapshot.
|
* Client asking Master to restore a snapshot.
|
||||||
*/
|
*/
|
||||||
C_M_RESTORE_SNAPSHOT (49, ExecutorType.MASTER_TABLE_OPERATIONS),
|
C_M_RESTORE_SNAPSHOT (49, ExecutorType.MASTER_SNAPSHOT_OPERATIONS),
|
||||||
|
|
||||||
// Updates from master to ZK. This is done by the master and there is
|
// Updates from master to ZK. This is done by the master and there is
|
||||||
// nothing to process by either Master or RS
|
// nothing to process by either Master or RS
|
||||||
|
@ -308,11 +308,6 @@ public enum EventType {
|
||||||
throw new IllegalArgumentException("Unknown code " + code);
|
throw new IllegalArgumentException("Unknown code " + code);
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean isOnlineSchemaChangeSupported() {
|
|
||||||
return this.equals(EventType.C_M_ADD_FAMILY) || this.equals(EventType.C_M_DELETE_FAMILY) ||
|
|
||||||
this.equals(EventType.C_M_MODIFY_FAMILY) || this.equals(EventType.C_M_MODIFY_TABLE);
|
|
||||||
}
|
|
||||||
|
|
||||||
ExecutorType getExecutorServiceType() {
|
ExecutorType getExecutorServiceType() {
|
||||||
return this.executor;
|
return this.executor;
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,6 +34,7 @@ public enum ExecutorType {
|
||||||
MASTER_RS_SHUTDOWN (5),
|
MASTER_RS_SHUTDOWN (5),
|
||||||
MASTER_META_SERVER_OPERATIONS (6),
|
MASTER_META_SERVER_OPERATIONS (6),
|
||||||
M_LOG_REPLAY_OPS (7),
|
M_LOG_REPLAY_OPS (7),
|
||||||
|
MASTER_SNAPSHOT_OPERATIONS (8),
|
||||||
|
|
||||||
// RegionServer executor services
|
// RegionServer executor services
|
||||||
RS_OPEN_REGION (20),
|
RS_OPEN_REGION (20),
|
||||||
|
|
|
@ -1337,16 +1337,21 @@ public class HMaster extends HRegionServer implements MasterServices {
|
||||||
*/
|
*/
|
||||||
private void startServiceThreads() throws IOException {
|
private void startServiceThreads() throws IOException {
|
||||||
// Start the executor service pools
|
// Start the executor service pools
|
||||||
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION,
|
this.executorService.startExecutorService(ExecutorType.MASTER_OPEN_REGION, conf.getInt(
|
||||||
conf.getInt("hbase.master.executor.openregion.threads", 5));
|
HConstants.MASTER_OPEN_REGION_THREADS, HConstants.MASTER_OPEN_REGION_THREADS_DEFAULT));
|
||||||
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION,
|
this.executorService.startExecutorService(ExecutorType.MASTER_CLOSE_REGION, conf.getInt(
|
||||||
conf.getInt("hbase.master.executor.closeregion.threads", 5));
|
HConstants.MASTER_CLOSE_REGION_THREADS, HConstants.MASTER_CLOSE_REGION_THREADS_DEFAULT));
|
||||||
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
|
this.executorService.startExecutorService(ExecutorType.MASTER_SERVER_OPERATIONS,
|
||||||
conf.getInt("hbase.master.executor.serverops.threads", 5));
|
conf.getInt(HConstants.MASTER_SERVER_OPERATIONS_THREADS,
|
||||||
|
HConstants.MASTER_SERVER_OPERATIONS_THREADS_DEFAULT));
|
||||||
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
|
this.executorService.startExecutorService(ExecutorType.MASTER_META_SERVER_OPERATIONS,
|
||||||
conf.getInt("hbase.master.executor.meta.serverops.threads", 5));
|
conf.getInt(HConstants.MASTER_META_SERVER_OPERATIONS_THREADS,
|
||||||
this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS,
|
HConstants.MASTER_META_SERVER_OPERATIONS_THREADS_DEFAULT));
|
||||||
conf.getInt("hbase.master.executor.logreplayops.threads", 10));
|
this.executorService.startExecutorService(ExecutorType.M_LOG_REPLAY_OPS, conf.getInt(
|
||||||
|
HConstants.MASTER_LOG_REPLAY_OPS_THREADS, HConstants.MASTER_LOG_REPLAY_OPS_THREADS_DEFAULT));
|
||||||
|
this.executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS,
|
||||||
|
conf.getInt(HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS,
|
||||||
|
HConstants.MASTER_SNAPSHOT_OPERATIONS_THREADS_DEFAULT));
|
||||||
|
|
||||||
// We depend on there being only one instance of this executor running
|
// We depend on there being only one instance of this executor running
|
||||||
// at a time. To do concurrency, would need fencing of enable/disable of
|
// at a time. To do concurrency, would need fencing of enable/disable of
|
||||||
|
|
|
@ -28,6 +28,7 @@ import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
@ -41,6 +42,7 @@ import org.apache.hadoop.hbase.executor.ExecutorService.Executor;
|
||||||
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus;
|
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorStatus;
|
||||||
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
import org.apache.hadoop.hbase.testclassification.MiscTests;
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.junit.Assert;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -219,5 +221,36 @@ public class TestExecutorService {
|
||||||
executorService.shutdown();
|
executorService.shutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSnapshotHandlers() throws Exception {
|
||||||
|
final Configuration conf = HBaseConfiguration.create();
|
||||||
|
final Server server = mock(Server.class);
|
||||||
|
when(server.getConfiguration()).thenReturn(conf);
|
||||||
|
|
||||||
|
ExecutorService executorService = new ExecutorService("testSnapshotHandlers");
|
||||||
|
executorService.startExecutorService(ExecutorType.MASTER_SNAPSHOT_OPERATIONS, 1);
|
||||||
|
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
executorService.submit(new EventHandler(server, EventType.C_M_SNAPSHOT_TABLE) {
|
||||||
|
@Override
|
||||||
|
public void process() throws IOException {
|
||||||
|
try {
|
||||||
|
latch.await();
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
int activeCount = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS)
|
||||||
|
.getThreadPoolExecutor().getActiveCount();
|
||||||
|
Assert.assertEquals(activeCount, 1);
|
||||||
|
latch.countDown();
|
||||||
|
Waiter.waitFor(conf, 3000, () -> {
|
||||||
|
int count = executorService.getExecutor(ExecutorType.MASTER_SNAPSHOT_OPERATIONS)
|
||||||
|
.getThreadPoolExecutor().getActiveCount();
|
||||||
|
return count == 0;
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue