diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDDLMasterFailover.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDDLMasterFailover.java
new file mode 100644
index 00000000000..f460aa95660
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/IntegrationTestDDLMasterFailover.java
@@ -0,0 +1,827 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
+import org.apache.commons.lang.RandomStringUtils;
+import org.apache.commons.lang.math.RandomUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.testclassification.IntegrationTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.HBaseFsck;
+import org.apache.hadoop.hbase.util.Threads;
+import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
+import org.apache.hadoop.util.ToolRunner;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+/**
+ *
+ * Integration test that verifies Procedure V2.
+ *
+ * DDL operations should go through (rollforward or rollback) when primary master is killed by
+ * ChaosMonkey (default MASTER_KILLING)
+ *
+ * Multiple Worker threads are started to randomly do the following Actions in loops:
+ * Actions generating and populating tables:
+ *
+ * - CreateTableAction
+ * - DisableTableAction
+ * - EnableTableAction
+ * - DeleteTableAction
+ * - AddRowAction
+ *
+ * Actions performing DDL operations:
+ *
+ * - AddColumnFamilyAction
+ * - AlterColumnFamilyVersionsAction
+ * - AlterColumnFamilyEncodingAction
+ * - DeleteColumnFamilyAction
+ *
+ *
+ *
+ * The threads run for a period of time (default 20 minutes) then are stopped at the end of
+ * runtime. Verification is performed towards those checkpoints:
+ *
+ * - No Actions throw Exceptions.
+ * - No inconsistencies are detected in hbck.
+ *
+ *
+ *
+ * This test should be run by the hbase user since it invokes hbck at the end
+ *
+ * Usage:
+ * hbase org.apache.hadoop.hbase.IntegrationTestDDLMasterFailover
+ * -Dhbase.IntegrationTestDDLMasterFailover.runtime=1200000
+ * -Dhbase.IntegrationTestDDLMasterFailover.numThreads=20
+ * -Dhbase.IntegrationTestDDLMasterFailover.numRegions=50 --monkey masterKilling
+ */
+
+@Category(IntegrationTests.class)
+public class IntegrationTestDDLMasterFailover extends IntegrationTestBase {
+
+ private static final Log LOG = LogFactory.getLog(IntegrationTestDDLMasterFailover.class);
+
+ private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
+
+ protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
+
+ protected static final int DEFAULT_NUM_THREADS = 20;
+
+ protected static final int DEFAULT_NUM_REGIONS = 50; // number of regions in pre-split tables
+
+ protected HBaseCluster cluster;
+
+ protected Connection connection;
+
+ /**
+ * A soft limit on how long we should run
+ */
+ protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
+ protected static final String NUM_THREADS_KEY = "hbase.%s.numThreads";
+ protected static final String NUM_REGIONS_KEY = "hbase.%s.numRegions";
+
+ protected AtomicBoolean running = new AtomicBoolean(true);
+
+ protected AtomicBoolean create_table = new AtomicBoolean(true);
+
+ protected int numThreads, numRegions;
+
+ ConcurrentHashMap enabledTables =
+ new ConcurrentHashMap();
+
+ ConcurrentHashMap disabledTables =
+ new ConcurrentHashMap();
+
+ ConcurrentHashMap deletedTables =
+ new ConcurrentHashMap();
+
+ @Override
+ public void setUpCluster() throws Exception {
+ util = getTestingUtil(getConf());
+ LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
+ util.initializeCluster(getMinServerCount());
+ LOG.debug("Done initializing/checking cluster");
+ cluster = util.getHBaseClusterInterface();
+ }
+
+ @Override
+ public void cleanUpCluster() throws Exception {
+ Admin admin = util.getHBaseAdmin();
+ admin.disableTables("ittable-\\d+");
+ admin.deleteTables("ittable-\\d+");
+ Connection connection = getConnection();
+ connection.close();
+ super.cleanUpCluster();
+ }
+
+ protected int getMinServerCount() {
+ return SERVER_COUNT;
+ }
+
+ protected synchronized void setConnection(Connection connection){
+ this.connection = connection;
+ }
+
+ protected synchronized Connection getConnection(){
+ if (this.connection == null) {
+ try {
+ Connection connection = ConnectionFactory.createConnection(getConf());
+ setConnection(connection);
+ } catch (IOException e) {
+ LOG.fatal("Failed to establish connection.", e);
+ }
+ }
+ return connection;
+ }
+
+ protected void verifyTables() throws IOException{
+ Connection connection = getConnection();
+ Admin admin = connection.getAdmin();
+ // iterating concurrent map
+ for (TableName tableName : enabledTables.keySet()){
+ Assert.assertTrue("Table: " + tableName + " in enabledTables is not enabled",
+ admin.isTableEnabled(tableName));
+ }
+ for (TableName tableName : disabledTables.keySet()){
+ Assert.assertTrue("Table: " + tableName + " in disabledTables is not disabled",
+ admin.isTableDisabled(tableName));
+ }
+ for (TableName tableName : deletedTables.keySet()){
+ Assert.assertFalse("Table: " + tableName + " in deletedTables is not deleted",
+ admin.tableExists(tableName));
+ }
+ admin.close();
+ }
+
+ @Test
+ public void testAsUnitTest() throws Exception {
+ runTest();
+ }
+
+ @Override
+ public int runTestFromCommandLine() throws Exception {
+ int ret = runTest();
+ return ret;
+ }
+
+ private abstract class MasterAction{
+ Connection connection = getConnection();
+
+ abstract void perform() throws IOException;
+ }
+
+ private abstract class TableAction extends MasterAction{
+ // TableAction has implemented selectTable() shared by multiple table Actions
+ protected HTableDescriptor selectTable(ConcurrentHashMap tableMap)
+ {
+ // randomly select table from tableMap
+ if (tableMap.isEmpty()){
+ return null;
+ }
+ // synchronization to prevent removal from multiple threads
+ synchronized (tableMap){
+ ArrayList tableList = new ArrayList(tableMap.keySet());
+ TableName randomKey = tableList.get(RandomUtils.nextInt(tableList.size()));
+ HTableDescriptor randomHtd = tableMap.get(randomKey);
+ // remove from tableMap
+ tableMap.remove(randomKey);
+ return randomHtd;
+ }
+ }
+ }
+
+ private class CreateTableAction extends TableAction {
+
+ @Override
+ void perform() throws IOException {
+ Admin admin = connection.getAdmin();
+ try {
+ HTableDescriptor htd = createTableDesc();
+ TableName tableName = htd.getTableName();
+ if ( admin.tableExists(tableName)){
+ return;
+ }
+ String numRegionKey = String.format(NUM_REGIONS_KEY, this.getClass().getSimpleName());
+ numRegions = getConf().getInt(numRegionKey, DEFAULT_NUM_REGIONS);
+ byte[] startKey = Bytes.toBytes("row-0000000000");
+ byte[] endKey = Bytes.toBytes("row-" + Integer.MAX_VALUE);
+ LOG.info("Creating table:" + htd);
+ admin.createTable(htd, startKey, endKey, numRegions);
+ Assert.assertTrue("Table: " + htd + " was not created", admin.tableExists(tableName));
+ HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
+ enabledTables.put(tableName, freshTableDesc);
+ LOG.info("Created table:" + freshTableDesc);
+ } catch (Exception e){
+ LOG.warn("Caught exception in action: " + this.getClass());
+ // TODO workaround
+ // when master failover happens during CREATE_TABLE, client will do RPC retry and get TableExistsException
+ // ignore for now till better resolution
+ if (e instanceof TableExistsException) {
+ LOG.warn("Caught TableExistsException in action: " + this.getClass(), e);
+ } else {
+ throw e;
+ }
+ } finally {
+ admin.close();
+ }
+ verifyTables();
+ }
+
+ private HTableDescriptor createTableDesc() {
+ String tableName = "ittable-" + String.format("%010d",
+ RandomUtils.nextInt(Integer.MAX_VALUE));
+ String familyName = "cf-" + Math.abs(RandomUtils.nextInt());
+ HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
+ // add random column family
+ htd.addFamily(new HColumnDescriptor(familyName));
+ return htd;
+ }
+ }
+
+ private class DisableTableAction extends TableAction {
+
+ @Override
+ void perform() throws IOException {
+
+ HTableDescriptor selected = selectTable(enabledTables);
+ if (selected == null) {
+ return;
+ }
+
+ Admin admin = connection.getAdmin();
+ try {
+ TableName tableName = selected.getTableName();
+ LOG.info("Disabling table :" + selected);
+ admin.disableTable(tableName);
+ Assert.assertTrue("Table: " + selected + " was not disabled",
+ admin.isTableDisabled(tableName));
+ HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
+ disabledTables.put(tableName, freshTableDesc);
+ LOG.info("Disabled table :" + freshTableDesc);
+ } catch (Exception e){
+ LOG.warn("Caught exception in action: " + this.getClass());
+ // TODO workaround
+ // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
+ // operations
+ // 1) when enable/disable starts, the table state is changed to ENABLING/DISABLING (ZK node
+ // in 1.x), which will be further changed to ENABLED/DISABLED once the operation completes
+ // 2) if master failover happens in the middle of the enable/disable operation, the new
+ // master will try to recover the tables in ENABLING/DISABLING state, as programmed in
+ // AssignmentManager#recoverTableInEnablingState() and
+ // AssignmentManager#recoverTableInDisablingState()
+ // 3) after the new master initialization completes, the procedure tries to re-do the
+ // enable/disable operation, which was already done. Ignore those exceptions before change
+ // of behaviors of AssignmentManager in presence of PV2
+ if (e instanceof TableNotEnabledException) {
+ LOG.warn("Caught TableNotEnabledException in action: " + this.getClass());
+ e.printStackTrace();
+ } else {
+ throw e;
+ }
+ } finally {
+ admin.close();
+ }
+ verifyTables();
+ }
+ }
+
+ private class EnableTableAction extends TableAction {
+
+ @Override
+ void perform() throws IOException {
+
+ HTableDescriptor selected = selectTable(disabledTables);
+ if (selected == null ) {
+ return;
+ }
+
+ Admin admin = connection.getAdmin();
+ try {
+ TableName tableName = selected.getTableName();
+ LOG.info("Enabling table :" + selected);
+ admin.enableTable(tableName);
+ Assert.assertTrue("Table: " + selected + " was not enabled",
+ admin.isTableEnabled(tableName));
+ HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
+ enabledTables.put(tableName, freshTableDesc);
+ LOG.info("Enabled table :" + freshTableDesc);
+ } catch (Exception e){
+ LOG.warn("Caught exception in action: " + this.getClass());
+ // TODO workaround
+ // loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
+ // operations 1) when enable/disable starts, the table state is changed to
+ // ENABLING/DISABLING (ZK node in 1.x), which will be further changed to ENABLED/DISABLED
+ // once the operation completes 2) if master failover happens in the middle of the
+ // enable/disable operation, the new master will try to recover the tables in
+ // ENABLING/DISABLING state, as programmed in
+ // AssignmentManager#recoverTableInEnablingState() and
+ // AssignmentManager#recoverTableInDisablingState()
+ // 3) after the new master initialization completes, the procedure tries to re-do the
+ // enable/disable operation, which was already done. Ignore those exceptions before
+ // change of behaviors of AssignmentManager in presence of PV2
+ if (e instanceof TableNotDisabledException) {
+ LOG.warn("Caught TableNotDisabledException in action: " + this.getClass());
+ e.printStackTrace();
+ } else {
+ throw e;
+ }
+ } finally {
+ admin.close();
+ }
+ verifyTables();
+ }
+ }
+
+ private class DeleteTableAction extends TableAction {
+
+ @Override
+ void perform() throws IOException {
+
+ HTableDescriptor selected = selectTable(disabledTables);
+ if (selected == null) {
+ return;
+ }
+
+ Admin admin = connection.getAdmin();
+ try {
+ TableName tableName = selected.getTableName();
+ LOG.info("Deleting table :" + selected);
+ admin.deleteTable(tableName);
+ Assert.assertFalse("Table: " + selected + " was not deleted",
+ admin.tableExists(tableName));
+ deletedTables.put(tableName, selected);
+ LOG.info("Deleted table :" + selected);
+ } catch (Exception e){
+ LOG.warn("Caught exception in action: " + this.getClass());
+ // TODO workaround
+ // when master failover happens during DELETE_TABLE, client will do RPC retry and get
+ // TableNotFoundException ignore for now till better resolution
+ if (e instanceof TableNotFoundException) {
+ LOG.warn("Caught TableNotFoundException in action: " + this.getClass());
+ e.printStackTrace();
+ } else {
+ throw e;
+ }
+ } finally {
+ admin.close();
+ }
+ verifyTables();
+ }
+ }
+
+
+ private abstract class ColumnAction extends TableAction{
+ // ColumnAction has implemented selectFamily() shared by multiple family Actions
+ protected HColumnDescriptor selectFamily(HTableDescriptor htd) {
+ if (htd == null) {
+ return null;
+ }
+ HColumnDescriptor[] families = htd.getColumnFamilies();
+ if (families.length == 0){
+ LOG.info("No column families in table: " + htd);
+ return null;
+ }
+ HColumnDescriptor randomCfd = families[RandomUtils.nextInt(families.length)];
+ return randomCfd;
+ }
+ }
+
+ private class AddColumnFamilyAction extends ColumnAction {
+
+ @Override
+ void perform() throws IOException {
+ HTableDescriptor selected = selectTable(disabledTables);
+ if (selected == null) {
+ return;
+ }
+
+ Admin admin = connection.getAdmin();
+ try {
+ HColumnDescriptor cfd = createFamilyDesc();
+ if (selected.hasFamily(cfd.getName())){
+ LOG.info(new String(cfd.getName()) + " already exists in table "
+ + selected.getTableName());
+ return;
+ }
+ TableName tableName = selected.getTableName();
+ LOG.info("Adding column family: " + cfd + " to table: " + tableName);
+ admin.addColumn(tableName, cfd);
+ // assertion
+ HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
+ Assert.assertTrue("Column family: " + cfd + " was not added",
+ freshTableDesc.hasFamily(cfd.getName()));
+ LOG.info("Added column family: " + cfd + " to table: " + tableName);
+ disabledTables.put(tableName, freshTableDesc);
+ } catch (Exception e){
+ LOG.warn("Caught exception in action: " + this.getClass());
+ // TODO HBASE-13415
+ // loose restriction for InvalidFamilyOperationException thrown in async operations before
+ // HBASE-13415 completes when failover happens, multiple procids may be created from the
+ // same request when 1 procedure succeeds, the others would complain about family already
+ // exists
+ if (e instanceof InvalidFamilyOperationException) {
+ LOG.warn("Caught InvalidFamilyOperationException in action: " + this.getClass());
+ e.printStackTrace();
+ } else {
+ throw e;
+ }
+ } finally {
+ admin.close();
+ }
+ verifyTables();
+ }
+
+ private HColumnDescriptor createFamilyDesc() {
+ String familyName = "cf-" + String.format("%010d", RandomUtils.nextInt(Integer.MAX_VALUE));
+ HColumnDescriptor cfd = new HColumnDescriptor(familyName);
+ return cfd;
+ }
+ }
+
+ private class AlterFamilyVersionsAction extends ColumnAction {
+
+ @Override
+ void perform() throws IOException {
+ HTableDescriptor selected = selectTable(disabledTables);
+ if (selected == null) {
+ return;
+ }
+ HColumnDescriptor columnDesc = selectFamily(selected);
+ if (columnDesc == null){
+ return;
+ }
+
+ Admin admin = connection.getAdmin();
+ int versions = RandomUtils.nextInt(10) + 3;
+ try {
+ TableName tableName = selected.getTableName();
+ LOG.info("Altering versions of column family: " + columnDesc + " to: " + versions +
+ " in table: " + tableName);
+ columnDesc.setMinVersions(versions);
+ columnDesc.setMaxVersions(versions);
+ admin.modifyTable(tableName, selected);
+ // assertion
+ HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
+ HColumnDescriptor freshColumnDesc = freshTableDesc.getFamily(columnDesc.getName());
+ Assert.assertEquals("Column family: " + columnDesc + " was not altered",
+ freshColumnDesc.getMaxVersions(), versions);
+ Assert.assertEquals("Column family: " + freshColumnDesc + " was not altered",
+ freshColumnDesc.getMinVersions(), versions);
+ LOG.info("Altered versions of column family: " + columnDesc + " to: " + versions +
+ " in table: " + tableName);
+ disabledTables.put(tableName, freshTableDesc);
+ } catch (Exception e) {
+ LOG.warn("Caught exception in action: " + this.getClass());
+ throw e;
+ } finally {
+ admin.close();
+ }
+ verifyTables();
+ }
+ }
+
+ private class AlterFamilyEncodingAction extends ColumnAction {
+
+ @Override
+ void perform() throws IOException {
+ HTableDescriptor selected = selectTable(disabledTables);
+ if (selected == null) {
+ return;
+ }
+ HColumnDescriptor columnDesc = selectFamily(selected);
+ if (columnDesc == null){
+ return;
+ }
+
+ Admin admin = connection.getAdmin();
+ try {
+ TableName tableName = selected.getTableName();
+ // possible DataBlockEncoding ids
+ int[] possibleIds = {0, 2, 3, 4, 6};
+ short id = (short) possibleIds[RandomUtils.nextInt(possibleIds.length)];
+ LOG.info("Altering encoding of column family: " + columnDesc + " to: " + id +
+ " in table: " + tableName);
+ columnDesc.setDataBlockEncoding(DataBlockEncoding.getEncodingById(id));
+ admin.modifyTable(tableName, selected);
+ // assertion
+ HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
+ HColumnDescriptor freshColumnDesc = freshTableDesc.getFamily(columnDesc.getName());
+ Assert.assertEquals("Encoding of column family: " + columnDesc + " was not altered",
+ freshColumnDesc.getDataBlockEncoding().getId(), id);
+ LOG.info("Altered encoding of column family: " + freshColumnDesc + " to: " + id +
+ " in table: " + tableName);
+ disabledTables.put(tableName, freshTableDesc);
+ } catch (Exception e) {
+ LOG.warn("Caught exception in action: " + this.getClass());
+ throw e;
+ } finally {
+ admin.close();
+ }
+ verifyTables();
+ }
+ }
+
+ private class DeleteColumnFamilyAction extends ColumnAction {
+
+ @Override
+ void perform() throws IOException {
+ HTableDescriptor selected = selectTable(disabledTables);
+ HColumnDescriptor cfd = selectFamily(selected);
+ if (selected == null || cfd == null) {
+ return;
+ }
+
+ Admin admin = connection.getAdmin();
+ try {
+ if (selected.getColumnFamilies().length < 2) {
+ LOG.info("No enough column families to delete in table " + selected.getTableName());
+ return;
+ }
+ TableName tableName = selected.getTableName();
+ LOG.info("Deleting column family: " + cfd + " from table: " + tableName);
+ admin.deleteColumn(tableName, cfd.getName());
+ // assertion
+ HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
+ Assert.assertFalse("Column family: " + cfd + " was not added",
+ freshTableDesc.hasFamily(cfd.getName()));
+ LOG.info("Deleted column family: " + cfd + " from table: " + tableName);
+ disabledTables.put(tableName, freshTableDesc);
+ } catch (Exception e) {
+ LOG.warn("Caught exception in action: " + this.getClass());
+ // TODO HBASE-13415
+ // loose restriction for InvalidFamilyOperationException thrown in async operations before
+ // HBASE-13415 completes when failover happens, multiple procids may be created from the
+ // same request when 1 procedure succeeds, the others would complain about family not
+ // exists
+ if (e instanceof InvalidFamilyOperationException) {
+ LOG.warn("Caught InvalidFamilyOperationException in action: " + this.getClass());
+ e.printStackTrace();
+ } else {
+ throw e;
+ }
+ } finally {
+ admin.close();
+ }
+ verifyTables();
+ }
+ }
+
+ private class AddRowAction extends ColumnAction {
+ // populate tables
+ @Override
+ void perform() throws IOException {
+ HTableDescriptor selected = selectTable(enabledTables);
+ if (selected == null ) {
+ return;
+ }
+
+ Admin admin = connection.getAdmin();
+ TableName tableName = selected.getTableName();
+ try (Table table = connection.getTable(tableName)){
+ ArrayList regionInfos = new ArrayList(admin.getTableRegions(
+ selected.getTableName()));
+ int numRegions = regionInfos.size();
+ // average number of rows to be added per action to each region
+ int average_rows = 1;
+ int numRows = average_rows * numRegions;
+ LOG.info("Adding " + numRows + " rows to table: " + selected);
+ for (int i = 0; i < numRows; i++){
+ // nextInt(Integer.MAX_VALUE)) to return positive numbers only
+ byte[] rowKey = Bytes.toBytes(
+ "row-" + String.format("%010d", RandomUtils.nextInt(Integer.MAX_VALUE)));
+ HColumnDescriptor cfd = selectFamily(selected);
+ if (cfd == null){
+ return;
+ }
+ byte[] family = cfd.getName();
+ byte[] qualifier = Bytes.toBytes("col-" + RandomUtils.nextInt(Integer.MAX_VALUE) % 10);
+ byte[] value = Bytes.toBytes("val-" + RandomStringUtils.randomAlphanumeric(10));
+ Put put = new Put(rowKey);
+ put.addColumn(family, qualifier, value);
+ table.put(put);
+ }
+ HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
+ enabledTables.put(tableName, freshTableDesc);
+ LOG.info("Added " + numRows + " rows to table: " + selected);
+ } catch (Exception e) {
+ LOG.warn("Caught exception in action: " + this.getClass());
+ throw e;
+ } finally {
+ admin.close();
+ }
+ verifyTables();
+ }
+ }
+
+ private enum ACTION {
+ CREATE_TABLE,
+ DISABLE_TABLE,
+ ENABLE_TABLE,
+ DELETE_TABLE,
+ ADD_COLUMNFAMILY,
+ DELETE_COLUMNFAMILY,
+ ALTER_FAMILYVERSIONS,
+ ALTER_FAMILYENCODING,
+ ADD_ROW
+ }
+
+ private class Worker extends Thread {
+
+ private Exception savedException;
+
+ private ACTION action;
+
+ @Override
+ public void run() {
+ while (running.get()) {
+ // select random action
+ ACTION selectedAction = ACTION.values()[RandomUtils.nextInt() % ACTION.values().length];
+ this.action = selectedAction;
+ LOG.info("Performing Action: " + selectedAction);
+
+ try {
+ switch (selectedAction) {
+ case CREATE_TABLE:
+ // stop creating new tables in the later stage of the test to avoid too many empty
+ // tables
+ if (create_table.get()) {
+ new CreateTableAction().perform();
+ }
+ break;
+ case ADD_ROW:
+ new AddRowAction().perform();
+ break;
+ case DISABLE_TABLE:
+ new DisableTableAction().perform();
+ break;
+ case ENABLE_TABLE:
+ new EnableTableAction().perform();
+ break;
+ case DELETE_TABLE:
+ // reduce probability of deleting table to 20%
+ if (RandomUtils.nextInt(100) < 20) {
+ new DeleteTableAction().perform();
+ }
+ break;
+ case ADD_COLUMNFAMILY:
+ new AddColumnFamilyAction().perform();
+ break;
+ case DELETE_COLUMNFAMILY:
+ // reduce probability of deleting column family to 20%
+ if (RandomUtils.nextInt(100) < 20) {
+ new DeleteColumnFamilyAction().perform();
+ }
+ break;
+ case ALTER_FAMILYVERSIONS:
+ new AlterFamilyVersionsAction().perform();
+ break;
+ case ALTER_FAMILYENCODING:
+ new AlterFamilyEncodingAction().perform();
+ break;
+ }
+ } catch (Exception ex) {
+ this.savedException = ex;
+ return;
+ }
+ }
+ LOG.info(this.getName() + " stopped");
+ }
+
+ public Exception getSavedException(){
+ return this.savedException;
+ }
+
+ public ACTION getAction(){
+ return this.action;
+ }
+ }
+
+ private void checkException(List workers){
+ if(workers == null || workers.isEmpty())
+ return;
+ for (Worker worker : workers){
+ Exception e = worker.getSavedException();
+ if (e != null) {
+ LOG.error("Found exception in thread: " + worker.getName());
+ e.printStackTrace();
+ }
+ Assert.assertNull("Action failed: " + worker.getAction() + " in thread: "
+ + worker.getName(), e);
+ }
+ }
+
+ private int runTest() throws Exception {
+ LOG.info("Starting the test");
+
+ String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
+ long runtime = util.getConfiguration().getLong(runtimeKey, DEFAULT_RUN_TIME);
+
+ String numThreadKey = String.format(NUM_THREADS_KEY, this.getClass().getSimpleName());
+ numThreads = util.getConfiguration().getInt(numThreadKey, DEFAULT_NUM_THREADS);
+
+ ArrayList workers = new ArrayList<>();
+ for (int i = 0; i < numThreads; i++) {
+ checkException(workers);
+ Worker worker = new Worker();
+ LOG.info("Launching worker thread " + worker.getName());
+ workers.add(worker);
+ worker.start();
+ }
+
+ Threads.sleep(runtime / 2);
+ LOG.info("Stopping creating new tables");
+ create_table.set(false);
+ Threads.sleep(runtime / 2);
+ LOG.info("Runtime is up");
+ running.set(false);
+
+ checkException(workers);
+
+ for (Worker worker : workers) {
+ worker.join();
+ }
+ LOG.info("All Worker threads stopped");
+
+ // verify
+ LOG.info("Verify actions of all threads succeeded");
+ checkException(workers);
+ LOG.info("Verify states of all tables");
+ verifyTables();
+
+ // RUN HBCK
+
+ HBaseFsck hbck = null;
+ try {
+ LOG.info("Running hbck");
+ hbck = HbckTestingUtil.doFsck(util.getConfiguration(), false);
+ HbckTestingUtil.assertNoErrors(hbck);
+ LOG.info("Finished hbck");
+ } finally {
+ if (hbck != null) {
+ hbck.close();
+ }
+ }
+ return 0;
+ }
+
+ @Override
+ public TableName getTablename() {
+ return null;
+ }
+
+ @Override
+ protected Set getColumnFamilies() {
+ return null;
+ }
+
+ public static void main(String[] args) throws Exception {
+ Configuration conf = HBaseConfiguration.create();
+ IntegrationTestingUtility.setUseDistributedCluster(conf);
+ IntegrationTestDDLMasterFailover masterFailover = new IntegrationTestDDLMasterFailover();
+ Connection connection = null;
+ int ret = 1;
+ try {
+ // Initialize connection once, then pass to Actions
+ LOG.debug("Setting up connection ...");
+ connection = ConnectionFactory.createConnection(conf);
+ masterFailover.setConnection(connection);
+ ret = ToolRunner.run(conf, masterFailover, args);
+ } catch (IOException e){
+ LOG.fatal("Failed to establish connection. Aborting test ...", e);
+ } finally {
+ connection = masterFailover.getConnection();
+ if (connection != null){
+ connection.close();
+ }
+ System.exit(ret);
+ }
+ }
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MasterKillingMonkeyFactory.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MasterKillingMonkeyFactory.java
new file mode 100644
index 00000000000..52dec3beb3c
--- /dev/null
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MasterKillingMonkeyFactory.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hbase.chaos.factories;
+
+import org.apache.hadoop.hbase.chaos.actions.Action;
+import org.apache.hadoop.hbase.chaos.actions.DumpClusterStatusAction;
+import org.apache.hadoop.hbase.chaos.actions.RestartActiveMasterAction;
+import org.apache.hadoop.hbase.chaos.monkies.ChaosMonkey;
+import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
+import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
+
+/**
+ * A chaos monkey to kill the active master periodically. Can be run in single master
+ * or multi master setup.
+ */
+public class MasterKillingMonkeyFactory extends MonkeyFactory {
+
+ private long action1Period;
+ private long action2Period;
+
+ private long restartActiveMasterSleepTime;
+
+ @Override
+ public ChaosMonkey build() {
+ loadProperties();
+
+ // Destructive actions to mess things around.
+ Action[] actions1 = new Action[] {
+ new RestartActiveMasterAction(restartActiveMasterSleepTime),
+ };
+
+ // Action to log more info for debugging
+ Action[] actions2 = new Action[] {
+ new DumpClusterStatusAction()
+ };
+
+ return new PolicyBasedChaosMonkey(util,
+ new PeriodicRandomActionPolicy(action1Period, actions1),
+ new PeriodicRandomActionPolicy(action2Period, actions2));
+ }
+
+ private void loadProperties() {
+
+ action1Period = Long.parseLong(this.properties.getProperty(
+ MonkeyConstants.PERIODIC_ACTION1_PERIOD,
+ MonkeyConstants.DEFAULT_PERIODIC_ACTION1_PERIOD + ""));
+ action2Period = Long.parseLong(this.properties.getProperty(
+ MonkeyConstants.PERIODIC_ACTION2_PERIOD,
+ MonkeyConstants.DEFAULT_PERIODIC_ACTION2_PERIOD + ""));
+ restartActiveMasterSleepTime = Long.parseLong(this.properties.getProperty(
+ MonkeyConstants.RESTART_ACTIVE_MASTER_SLEEP_TIME,
+ MonkeyConstants.DEFAULT_RESTART_ACTIVE_MASTER_SLEEP_TIME + ""));
+ }
+
+}
diff --git a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java
index f4b1c5306db..2f652512eb7 100644
--- a/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java
+++ b/hbase-it/src/test/java/org/apache/hadoop/hbase/chaos/factories/MonkeyFactory.java
@@ -69,6 +69,7 @@ public abstract class MonkeyFactory {
public static final String SERVER_KILLING = "serverKilling";
public static final String STRESS_AM = "stressAM";
public static final String NO_KILL = "noKill";
+ public static final String MASTER_KILLING = "masterKilling";
public static Map FACTORIES = ImmutableMap.builder()
.put(CALM, new CalmMonkeyFactory())
@@ -77,6 +78,7 @@ public abstract class MonkeyFactory {
.put(SERVER_KILLING, new ServerKillingMonkeyFactory())
.put(STRESS_AM, new StressAssignmentManagerMonkeyFactory())
.put(NO_KILL, new NoKillMonkeyFactory())
+ .put(MASTER_KILLING, new MasterKillingMonkeyFactory())
.build();
public static MonkeyFactory getFactory(String factoryName) {