HBASE-13470 High level Integration test for master DDL operations (Sophia Feng)
This commit is contained in:
parent
f469c3bd97
commit
1b5ad45b85
|
@ -0,0 +1,827 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.io.encoding.DataBlockEncoding;
|
||||||
|
import org.apache.commons.lang.RandomStringUtils;
|
||||||
|
import org.apache.commons.lang.math.RandomUtils;
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.client.*;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.IntegrationTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.HBaseFsck;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.apache.hadoop.hbase.util.hbck.HbckTestingUtil;
|
||||||
|
import org.apache.hadoop.util.ToolRunner;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
* Integration test that verifies Procedure V2. <br/><br/>
|
||||||
|
*
|
||||||
|
* DDL operations should go through (rollforward or rollback) when primary master is killed by
|
||||||
|
* ChaosMonkey (default MASTER_KILLING)<br/><br/>
|
||||||
|
*
|
||||||
|
* Multiple Worker threads are started to randomly do the following Actions in loops:<br/>
|
||||||
|
* Actions generating and populating tables:
|
||||||
|
* <ul>
|
||||||
|
* <li>CreateTableAction</li>
|
||||||
|
* <li>DisableTableAction</li>
|
||||||
|
* <li>EnableTableAction</li>
|
||||||
|
* <li>DeleteTableAction</li>
|
||||||
|
* <li>AddRowAction</li>
|
||||||
|
* </ul>
|
||||||
|
* Actions performing DDL operations:
|
||||||
|
* <ul>
|
||||||
|
* <li>AddColumnFamilyAction</li>
|
||||||
|
* <li>AlterColumnFamilyVersionsAction</li>
|
||||||
|
* <li>AlterColumnFamilyEncodingAction</li>
|
||||||
|
* <li>DeleteColumnFamilyAction</li>
|
||||||
|
* </ul>
|
||||||
|
* <br/>
|
||||||
|
*
|
||||||
|
* The threads run for a period of time (default 20 minutes) then are stopped at the end of
|
||||||
|
* runtime. Verification is performed towards those checkpoints:
|
||||||
|
* <ol>
|
||||||
|
* <li>No Actions throw Exceptions.</li>
|
||||||
|
* <li>No inconsistencies are detected in hbck.</li>
|
||||||
|
* </ol>
|
||||||
|
*
|
||||||
|
* <p>
|
||||||
|
* This test should be run by the hbase user since it invokes hbck at the end
|
||||||
|
* </p><p>
|
||||||
|
* Usage:
|
||||||
|
* hbase org.apache.hadoop.hbase.IntegrationTestDDLMasterFailover
|
||||||
|
* -Dhbase.IntegrationTestDDLMasterFailover.runtime=1200000
|
||||||
|
* -Dhbase.IntegrationTestDDLMasterFailover.numThreads=20
|
||||||
|
* -Dhbase.IntegrationTestDDLMasterFailover.numRegions=50 --monkey masterKilling
|
||||||
|
*/
|
||||||
|
|
||||||
|
@Category(IntegrationTests.class)
|
||||||
|
public class IntegrationTestDDLMasterFailover extends IntegrationTestBase {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(IntegrationTestDDLMasterFailover.class);
|
||||||
|
|
||||||
|
private static final int SERVER_COUNT = 1; // number of slaves for the smallest cluster
|
||||||
|
|
||||||
|
protected static final long DEFAULT_RUN_TIME = 20 * 60 * 1000;
|
||||||
|
|
||||||
|
protected static final int DEFAULT_NUM_THREADS = 20;
|
||||||
|
|
||||||
|
protected static final int DEFAULT_NUM_REGIONS = 50; // number of regions in pre-split tables
|
||||||
|
|
||||||
|
protected HBaseCluster cluster;
|
||||||
|
|
||||||
|
protected Connection connection;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A soft limit on how long we should run
|
||||||
|
*/
|
||||||
|
protected static final String RUN_TIME_KEY = "hbase.%s.runtime";
|
||||||
|
protected static final String NUM_THREADS_KEY = "hbase.%s.numThreads";
|
||||||
|
protected static final String NUM_REGIONS_KEY = "hbase.%s.numRegions";
|
||||||
|
|
||||||
|
protected AtomicBoolean running = new AtomicBoolean(true);
|
||||||
|
|
||||||
|
protected AtomicBoolean create_table = new AtomicBoolean(true);
|
||||||
|
|
||||||
|
protected int numThreads, numRegions;
|
||||||
|
|
||||||
|
ConcurrentHashMap<TableName, HTableDescriptor> enabledTables =
|
||||||
|
new ConcurrentHashMap<TableName, HTableDescriptor>();
|
||||||
|
|
||||||
|
ConcurrentHashMap<TableName, HTableDescriptor> disabledTables =
|
||||||
|
new ConcurrentHashMap<TableName, HTableDescriptor>();
|
||||||
|
|
||||||
|
ConcurrentHashMap<TableName, HTableDescriptor> deletedTables =
|
||||||
|
new ConcurrentHashMap<TableName, HTableDescriptor>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void setUpCluster() throws Exception {
|
||||||
|
util = getTestingUtil(getConf());
|
||||||
|
LOG.debug("Initializing/checking cluster has " + SERVER_COUNT + " servers");
|
||||||
|
util.initializeCluster(getMinServerCount());
|
||||||
|
LOG.debug("Done initializing/checking cluster");
|
||||||
|
cluster = util.getHBaseClusterInterface();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void cleanUpCluster() throws Exception {
|
||||||
|
Admin admin = util.getHBaseAdmin();
|
||||||
|
admin.disableTables("ittable-\\d+");
|
||||||
|
admin.deleteTables("ittable-\\d+");
|
||||||
|
Connection connection = getConnection();
|
||||||
|
connection.close();
|
||||||
|
super.cleanUpCluster();
|
||||||
|
}
|
||||||
|
|
||||||
|
protected int getMinServerCount() {
|
||||||
|
return SERVER_COUNT;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected synchronized void setConnection(Connection connection){
|
||||||
|
this.connection = connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected synchronized Connection getConnection(){
|
||||||
|
if (this.connection == null) {
|
||||||
|
try {
|
||||||
|
Connection connection = ConnectionFactory.createConnection(getConf());
|
||||||
|
setConnection(connection);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.fatal("Failed to establish connection.", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
|
||||||
|
protected void verifyTables() throws IOException{
|
||||||
|
Connection connection = getConnection();
|
||||||
|
Admin admin = connection.getAdmin();
|
||||||
|
// iterating concurrent map
|
||||||
|
for (TableName tableName : enabledTables.keySet()){
|
||||||
|
Assert.assertTrue("Table: " + tableName + " in enabledTables is not enabled",
|
||||||
|
admin.isTableEnabled(tableName));
|
||||||
|
}
|
||||||
|
for (TableName tableName : disabledTables.keySet()){
|
||||||
|
Assert.assertTrue("Table: " + tableName + " in disabledTables is not disabled",
|
||||||
|
admin.isTableDisabled(tableName));
|
||||||
|
}
|
||||||
|
for (TableName tableName : deletedTables.keySet()){
|
||||||
|
Assert.assertFalse("Table: " + tableName + " in deletedTables is not deleted",
|
||||||
|
admin.tableExists(tableName));
|
||||||
|
}
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAsUnitTest() throws Exception {
|
||||||
|
runTest();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int runTestFromCommandLine() throws Exception {
|
||||||
|
int ret = runTest();
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
private abstract class MasterAction{
|
||||||
|
Connection connection = getConnection();
|
||||||
|
|
||||||
|
abstract void perform() throws IOException;
|
||||||
|
}
|
||||||
|
|
||||||
|
private abstract class TableAction extends MasterAction{
|
||||||
|
// TableAction has implemented selectTable() shared by multiple table Actions
|
||||||
|
protected HTableDescriptor selectTable(ConcurrentHashMap<TableName, HTableDescriptor> tableMap)
|
||||||
|
{
|
||||||
|
// randomly select table from tableMap
|
||||||
|
if (tableMap.isEmpty()){
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
// synchronization to prevent removal from multiple threads
|
||||||
|
synchronized (tableMap){
|
||||||
|
ArrayList<TableName> tableList = new ArrayList<TableName>(tableMap.keySet());
|
||||||
|
TableName randomKey = tableList.get(RandomUtils.nextInt(tableList.size()));
|
||||||
|
HTableDescriptor randomHtd = tableMap.get(randomKey);
|
||||||
|
// remove from tableMap
|
||||||
|
tableMap.remove(randomKey);
|
||||||
|
return randomHtd;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class CreateTableAction extends TableAction {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void perform() throws IOException {
|
||||||
|
Admin admin = connection.getAdmin();
|
||||||
|
try {
|
||||||
|
HTableDescriptor htd = createTableDesc();
|
||||||
|
TableName tableName = htd.getTableName();
|
||||||
|
if ( admin.tableExists(tableName)){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String numRegionKey = String.format(NUM_REGIONS_KEY, this.getClass().getSimpleName());
|
||||||
|
numRegions = getConf().getInt(numRegionKey, DEFAULT_NUM_REGIONS);
|
||||||
|
byte[] startKey = Bytes.toBytes("row-0000000000");
|
||||||
|
byte[] endKey = Bytes.toBytes("row-" + Integer.MAX_VALUE);
|
||||||
|
LOG.info("Creating table:" + htd);
|
||||||
|
admin.createTable(htd, startKey, endKey, numRegions);
|
||||||
|
Assert.assertTrue("Table: " + htd + " was not created", admin.tableExists(tableName));
|
||||||
|
HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
|
||||||
|
enabledTables.put(tableName, freshTableDesc);
|
||||||
|
LOG.info("Created table:" + freshTableDesc);
|
||||||
|
} catch (Exception e){
|
||||||
|
LOG.warn("Caught exception in action: " + this.getClass());
|
||||||
|
// TODO workaround
|
||||||
|
// when master failover happens during CREATE_TABLE, client will do RPC retry and get TableExistsException
|
||||||
|
// ignore for now till better resolution
|
||||||
|
if (e instanceof TableExistsException) {
|
||||||
|
LOG.warn("Caught TableExistsException in action: " + this.getClass(), e);
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
verifyTables();
|
||||||
|
}
|
||||||
|
|
||||||
|
private HTableDescriptor createTableDesc() {
|
||||||
|
String tableName = "ittable-" + String.format("%010d",
|
||||||
|
RandomUtils.nextInt(Integer.MAX_VALUE));
|
||||||
|
String familyName = "cf-" + Math.abs(RandomUtils.nextInt());
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(TableName.valueOf(tableName));
|
||||||
|
// add random column family
|
||||||
|
htd.addFamily(new HColumnDescriptor(familyName));
|
||||||
|
return htd;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class DisableTableAction extends TableAction {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void perform() throws IOException {
|
||||||
|
|
||||||
|
HTableDescriptor selected = selectTable(enabledTables);
|
||||||
|
if (selected == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Admin admin = connection.getAdmin();
|
||||||
|
try {
|
||||||
|
TableName tableName = selected.getTableName();
|
||||||
|
LOG.info("Disabling table :" + selected);
|
||||||
|
admin.disableTable(tableName);
|
||||||
|
Assert.assertTrue("Table: " + selected + " was not disabled",
|
||||||
|
admin.isTableDisabled(tableName));
|
||||||
|
HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
|
||||||
|
disabledTables.put(tableName, freshTableDesc);
|
||||||
|
LOG.info("Disabled table :" + freshTableDesc);
|
||||||
|
} catch (Exception e){
|
||||||
|
LOG.warn("Caught exception in action: " + this.getClass());
|
||||||
|
// TODO workaround
|
||||||
|
// loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
|
||||||
|
// operations
|
||||||
|
// 1) when enable/disable starts, the table state is changed to ENABLING/DISABLING (ZK node
|
||||||
|
// in 1.x), which will be further changed to ENABLED/DISABLED once the operation completes
|
||||||
|
// 2) if master failover happens in the middle of the enable/disable operation, the new
|
||||||
|
// master will try to recover the tables in ENABLING/DISABLING state, as programmed in
|
||||||
|
// AssignmentManager#recoverTableInEnablingState() and
|
||||||
|
// AssignmentManager#recoverTableInDisablingState()
|
||||||
|
// 3) after the new master initialization completes, the procedure tries to re-do the
|
||||||
|
// enable/disable operation, which was already done. Ignore those exceptions before change
|
||||||
|
// of behaviors of AssignmentManager in presence of PV2
|
||||||
|
if (e instanceof TableNotEnabledException) {
|
||||||
|
LOG.warn("Caught TableNotEnabledException in action: " + this.getClass());
|
||||||
|
e.printStackTrace();
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
verifyTables();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class EnableTableAction extends TableAction {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void perform() throws IOException {
|
||||||
|
|
||||||
|
HTableDescriptor selected = selectTable(disabledTables);
|
||||||
|
if (selected == null ) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Admin admin = connection.getAdmin();
|
||||||
|
try {
|
||||||
|
TableName tableName = selected.getTableName();
|
||||||
|
LOG.info("Enabling table :" + selected);
|
||||||
|
admin.enableTable(tableName);
|
||||||
|
Assert.assertTrue("Table: " + selected + " was not enabled",
|
||||||
|
admin.isTableEnabled(tableName));
|
||||||
|
HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
|
||||||
|
enabledTables.put(tableName, freshTableDesc);
|
||||||
|
LOG.info("Enabled table :" + freshTableDesc);
|
||||||
|
} catch (Exception e){
|
||||||
|
LOG.warn("Caught exception in action: " + this.getClass());
|
||||||
|
// TODO workaround
|
||||||
|
// loose restriction for TableNotDisabledException/TableNotEnabledException thrown in sync
|
||||||
|
// operations 1) when enable/disable starts, the table state is changed to
|
||||||
|
// ENABLING/DISABLING (ZK node in 1.x), which will be further changed to ENABLED/DISABLED
|
||||||
|
// once the operation completes 2) if master failover happens in the middle of the
|
||||||
|
// enable/disable operation, the new master will try to recover the tables in
|
||||||
|
// ENABLING/DISABLING state, as programmed in
|
||||||
|
// AssignmentManager#recoverTableInEnablingState() and
|
||||||
|
// AssignmentManager#recoverTableInDisablingState()
|
||||||
|
// 3) after the new master initialization completes, the procedure tries to re-do the
|
||||||
|
// enable/disable operation, which was already done. Ignore those exceptions before
|
||||||
|
// change of behaviors of AssignmentManager in presence of PV2
|
||||||
|
if (e instanceof TableNotDisabledException) {
|
||||||
|
LOG.warn("Caught TableNotDisabledException in action: " + this.getClass());
|
||||||
|
e.printStackTrace();
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
verifyTables();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class DeleteTableAction extends TableAction {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void perform() throws IOException {
|
||||||
|
|
||||||
|
HTableDescriptor selected = selectTable(disabledTables);
|
||||||
|
if (selected == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Admin admin = connection.getAdmin();
|
||||||
|
try {
|
||||||
|
TableName tableName = selected.getTableName();
|
||||||
|
LOG.info("Deleting table :" + selected);
|
||||||
|
admin.deleteTable(tableName);
|
||||||
|
Assert.assertFalse("Table: " + selected + " was not deleted",
|
||||||
|
admin.tableExists(tableName));
|
||||||
|
deletedTables.put(tableName, selected);
|
||||||
|
LOG.info("Deleted table :" + selected);
|
||||||
|
} catch (Exception e){
|
||||||
|
LOG.warn("Caught exception in action: " + this.getClass());
|
||||||
|
// TODO workaround
|
||||||
|
// when master failover happens during DELETE_TABLE, client will do RPC retry and get
|
||||||
|
// TableNotFoundException ignore for now till better resolution
|
||||||
|
if (e instanceof TableNotFoundException) {
|
||||||
|
LOG.warn("Caught TableNotFoundException in action: " + this.getClass());
|
||||||
|
e.printStackTrace();
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
verifyTables();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
private abstract class ColumnAction extends TableAction{
|
||||||
|
// ColumnAction has implemented selectFamily() shared by multiple family Actions
|
||||||
|
protected HColumnDescriptor selectFamily(HTableDescriptor htd) {
|
||||||
|
if (htd == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
HColumnDescriptor[] families = htd.getColumnFamilies();
|
||||||
|
if (families.length == 0){
|
||||||
|
LOG.info("No column families in table: " + htd);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
HColumnDescriptor randomCfd = families[RandomUtils.nextInt(families.length)];
|
||||||
|
return randomCfd;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class AddColumnFamilyAction extends ColumnAction {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void perform() throws IOException {
|
||||||
|
HTableDescriptor selected = selectTable(disabledTables);
|
||||||
|
if (selected == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Admin admin = connection.getAdmin();
|
||||||
|
try {
|
||||||
|
HColumnDescriptor cfd = createFamilyDesc();
|
||||||
|
if (selected.hasFamily(cfd.getName())){
|
||||||
|
LOG.info(new String(cfd.getName()) + " already exists in table "
|
||||||
|
+ selected.getTableName());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
TableName tableName = selected.getTableName();
|
||||||
|
LOG.info("Adding column family: " + cfd + " to table: " + tableName);
|
||||||
|
admin.addColumn(tableName, cfd);
|
||||||
|
// assertion
|
||||||
|
HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
|
||||||
|
Assert.assertTrue("Column family: " + cfd + " was not added",
|
||||||
|
freshTableDesc.hasFamily(cfd.getName()));
|
||||||
|
LOG.info("Added column family: " + cfd + " to table: " + tableName);
|
||||||
|
disabledTables.put(tableName, freshTableDesc);
|
||||||
|
} catch (Exception e){
|
||||||
|
LOG.warn("Caught exception in action: " + this.getClass());
|
||||||
|
// TODO HBASE-13415
|
||||||
|
// loose restriction for InvalidFamilyOperationException thrown in async operations before
|
||||||
|
// HBASE-13415 completes when failover happens, multiple procids may be created from the
|
||||||
|
// same request when 1 procedure succeeds, the others would complain about family already
|
||||||
|
// exists
|
||||||
|
if (e instanceof InvalidFamilyOperationException) {
|
||||||
|
LOG.warn("Caught InvalidFamilyOperationException in action: " + this.getClass());
|
||||||
|
e.printStackTrace();
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
verifyTables();
|
||||||
|
}
|
||||||
|
|
||||||
|
private HColumnDescriptor createFamilyDesc() {
|
||||||
|
String familyName = "cf-" + String.format("%010d", RandomUtils.nextInt(Integer.MAX_VALUE));
|
||||||
|
HColumnDescriptor cfd = new HColumnDescriptor(familyName);
|
||||||
|
return cfd;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class AlterFamilyVersionsAction extends ColumnAction {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void perform() throws IOException {
|
||||||
|
HTableDescriptor selected = selectTable(disabledTables);
|
||||||
|
if (selected == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
HColumnDescriptor columnDesc = selectFamily(selected);
|
||||||
|
if (columnDesc == null){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Admin admin = connection.getAdmin();
|
||||||
|
int versions = RandomUtils.nextInt(10) + 3;
|
||||||
|
try {
|
||||||
|
TableName tableName = selected.getTableName();
|
||||||
|
LOG.info("Altering versions of column family: " + columnDesc + " to: " + versions +
|
||||||
|
" in table: " + tableName);
|
||||||
|
columnDesc.setMinVersions(versions);
|
||||||
|
columnDesc.setMaxVersions(versions);
|
||||||
|
admin.modifyTable(tableName, selected);
|
||||||
|
// assertion
|
||||||
|
HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
|
||||||
|
HColumnDescriptor freshColumnDesc = freshTableDesc.getFamily(columnDesc.getName());
|
||||||
|
Assert.assertEquals("Column family: " + columnDesc + " was not altered",
|
||||||
|
freshColumnDesc.getMaxVersions(), versions);
|
||||||
|
Assert.assertEquals("Column family: " + freshColumnDesc + " was not altered",
|
||||||
|
freshColumnDesc.getMinVersions(), versions);
|
||||||
|
LOG.info("Altered versions of column family: " + columnDesc + " to: " + versions +
|
||||||
|
" in table: " + tableName);
|
||||||
|
disabledTables.put(tableName, freshTableDesc);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Caught exception in action: " + this.getClass());
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
verifyTables();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class AlterFamilyEncodingAction extends ColumnAction {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void perform() throws IOException {
|
||||||
|
HTableDescriptor selected = selectTable(disabledTables);
|
||||||
|
if (selected == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
HColumnDescriptor columnDesc = selectFamily(selected);
|
||||||
|
if (columnDesc == null){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Admin admin = connection.getAdmin();
|
||||||
|
try {
|
||||||
|
TableName tableName = selected.getTableName();
|
||||||
|
// possible DataBlockEncoding ids
|
||||||
|
int[] possibleIds = {0, 2, 3, 4, 6};
|
||||||
|
short id = (short) possibleIds[RandomUtils.nextInt(possibleIds.length)];
|
||||||
|
LOG.info("Altering encoding of column family: " + columnDesc + " to: " + id +
|
||||||
|
" in table: " + tableName);
|
||||||
|
columnDesc.setDataBlockEncoding(DataBlockEncoding.getEncodingById(id));
|
||||||
|
admin.modifyTable(tableName, selected);
|
||||||
|
// assertion
|
||||||
|
HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
|
||||||
|
HColumnDescriptor freshColumnDesc = freshTableDesc.getFamily(columnDesc.getName());
|
||||||
|
Assert.assertEquals("Encoding of column family: " + columnDesc + " was not altered",
|
||||||
|
freshColumnDesc.getDataBlockEncoding().getId(), id);
|
||||||
|
LOG.info("Altered encoding of column family: " + freshColumnDesc + " to: " + id +
|
||||||
|
" in table: " + tableName);
|
||||||
|
disabledTables.put(tableName, freshTableDesc);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Caught exception in action: " + this.getClass());
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
verifyTables();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class DeleteColumnFamilyAction extends ColumnAction {
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void perform() throws IOException {
|
||||||
|
HTableDescriptor selected = selectTable(disabledTables);
|
||||||
|
HColumnDescriptor cfd = selectFamily(selected);
|
||||||
|
if (selected == null || cfd == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Admin admin = connection.getAdmin();
|
||||||
|
try {
|
||||||
|
if (selected.getColumnFamilies().length < 2) {
|
||||||
|
LOG.info("No enough column families to delete in table " + selected.getTableName());
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
TableName tableName = selected.getTableName();
|
||||||
|
LOG.info("Deleting column family: " + cfd + " from table: " + tableName);
|
||||||
|
admin.deleteColumn(tableName, cfd.getName());
|
||||||
|
// assertion
|
||||||
|
HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
|
||||||
|
Assert.assertFalse("Column family: " + cfd + " was not added",
|
||||||
|
freshTableDesc.hasFamily(cfd.getName()));
|
||||||
|
LOG.info("Deleted column family: " + cfd + " from table: " + tableName);
|
||||||
|
disabledTables.put(tableName, freshTableDesc);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Caught exception in action: " + this.getClass());
|
||||||
|
// TODO HBASE-13415
|
||||||
|
// loose restriction for InvalidFamilyOperationException thrown in async operations before
|
||||||
|
// HBASE-13415 completes when failover happens, multiple procids may be created from the
|
||||||
|
// same request when 1 procedure succeeds, the others would complain about family not
|
||||||
|
// exists
|
||||||
|
if (e instanceof InvalidFamilyOperationException) {
|
||||||
|
LOG.warn("Caught InvalidFamilyOperationException in action: " + this.getClass());
|
||||||
|
e.printStackTrace();
|
||||||
|
} else {
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
verifyTables();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private class AddRowAction extends ColumnAction {
|
||||||
|
// populate tables
|
||||||
|
@Override
|
||||||
|
void perform() throws IOException {
|
||||||
|
HTableDescriptor selected = selectTable(enabledTables);
|
||||||
|
if (selected == null ) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
Admin admin = connection.getAdmin();
|
||||||
|
TableName tableName = selected.getTableName();
|
||||||
|
try (Table table = connection.getTable(tableName)){
|
||||||
|
ArrayList<HRegionInfo> regionInfos = new ArrayList<HRegionInfo>(admin.getTableRegions(
|
||||||
|
selected.getTableName()));
|
||||||
|
int numRegions = regionInfos.size();
|
||||||
|
// average number of rows to be added per action to each region
|
||||||
|
int average_rows = 1;
|
||||||
|
int numRows = average_rows * numRegions;
|
||||||
|
LOG.info("Adding " + numRows + " rows to table: " + selected);
|
||||||
|
for (int i = 0; i < numRows; i++){
|
||||||
|
// nextInt(Integer.MAX_VALUE)) to return positive numbers only
|
||||||
|
byte[] rowKey = Bytes.toBytes(
|
||||||
|
"row-" + String.format("%010d", RandomUtils.nextInt(Integer.MAX_VALUE)));
|
||||||
|
HColumnDescriptor cfd = selectFamily(selected);
|
||||||
|
if (cfd == null){
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
byte[] family = cfd.getName();
|
||||||
|
byte[] qualifier = Bytes.toBytes("col-" + RandomUtils.nextInt(Integer.MAX_VALUE) % 10);
|
||||||
|
byte[] value = Bytes.toBytes("val-" + RandomStringUtils.randomAlphanumeric(10));
|
||||||
|
Put put = new Put(rowKey);
|
||||||
|
put.addColumn(family, qualifier, value);
|
||||||
|
table.put(put);
|
||||||
|
}
|
||||||
|
HTableDescriptor freshTableDesc = admin.getTableDescriptor(tableName);
|
||||||
|
enabledTables.put(tableName, freshTableDesc);
|
||||||
|
LOG.info("Added " + numRows + " rows to table: " + selected);
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("Caught exception in action: " + this.getClass());
|
||||||
|
throw e;
|
||||||
|
} finally {
|
||||||
|
admin.close();
|
||||||
|
}
|
||||||
|
verifyTables();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private enum ACTION {
|
||||||
|
CREATE_TABLE,
|
||||||
|
DISABLE_TABLE,
|
||||||
|
ENABLE_TABLE,
|
||||||
|
DELETE_TABLE,
|
||||||
|
ADD_COLUMNFAMILY,
|
||||||
|
DELETE_COLUMNFAMILY,
|
||||||
|
ALTER_FAMILYVERSIONS,
|
||||||
|
ALTER_FAMILYENCODING,
|
||||||
|
ADD_ROW
|
||||||
|
}
|
||||||
|
|
||||||
|
private class Worker extends Thread {
|
||||||
|
|
||||||
|
private Exception savedException;
|
||||||
|
|
||||||
|
private ACTION action;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
while (running.get()) {
|
||||||
|
// select random action
|
||||||
|
ACTION selectedAction = ACTION.values()[RandomUtils.nextInt() % ACTION.values().length];
|
||||||
|
this.action = selectedAction;
|
||||||
|
LOG.info("Performing Action: " + selectedAction);
|
||||||
|
|
||||||
|
try {
|
||||||
|
switch (selectedAction) {
|
||||||
|
case CREATE_TABLE:
|
||||||
|
// stop creating new tables in the later stage of the test to avoid too many empty
|
||||||
|
// tables
|
||||||
|
if (create_table.get()) {
|
||||||
|
new CreateTableAction().perform();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case ADD_ROW:
|
||||||
|
new AddRowAction().perform();
|
||||||
|
break;
|
||||||
|
case DISABLE_TABLE:
|
||||||
|
new DisableTableAction().perform();
|
||||||
|
break;
|
||||||
|
case ENABLE_TABLE:
|
||||||
|
new EnableTableAction().perform();
|
||||||
|
break;
|
||||||
|
case DELETE_TABLE:
|
||||||
|
// reduce probability of deleting table to 20%
|
||||||
|
if (RandomUtils.nextInt(100) < 20) {
|
||||||
|
new DeleteTableAction().perform();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case ADD_COLUMNFAMILY:
|
||||||
|
new AddColumnFamilyAction().perform();
|
||||||
|
break;
|
||||||
|
case DELETE_COLUMNFAMILY:
|
||||||
|
// reduce probability of deleting column family to 20%
|
||||||
|
if (RandomUtils.nextInt(100) < 20) {
|
||||||
|
new DeleteColumnFamilyAction().perform();
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case ALTER_FAMILYVERSIONS:
|
||||||
|
new AlterFamilyVersionsAction().perform();
|
||||||
|
break;
|
||||||
|
case ALTER_FAMILYENCODING:
|
||||||
|
new AlterFamilyEncodingAction().perform();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
} catch (Exception ex) {
|
||||||
|
this.savedException = ex;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
LOG.info(this.getName() + " stopped");
|
||||||
|
}
|
||||||
|
|
||||||
|
public Exception getSavedException(){
|
||||||
|
return this.savedException;
|
||||||
|
}
|
||||||
|
|
||||||
|
public ACTION getAction(){
|
||||||
|
return this.action;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void checkException(List<Worker> workers){
|
||||||
|
if(workers == null || workers.isEmpty())
|
||||||
|
return;
|
||||||
|
for (Worker worker : workers){
|
||||||
|
Exception e = worker.getSavedException();
|
||||||
|
if (e != null) {
|
||||||
|
LOG.error("Found exception in thread: " + worker.getName());
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
Assert.assertNull("Action failed: " + worker.getAction() + " in thread: "
|
||||||
|
+ worker.getName(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private int runTest() throws Exception {
|
||||||
|
LOG.info("Starting the test");
|
||||||
|
|
||||||
|
String runtimeKey = String.format(RUN_TIME_KEY, this.getClass().getSimpleName());
|
||||||
|
long runtime = util.getConfiguration().getLong(runtimeKey, DEFAULT_RUN_TIME);
|
||||||
|
|
||||||
|
String numThreadKey = String.format(NUM_THREADS_KEY, this.getClass().getSimpleName());
|
||||||
|
numThreads = util.getConfiguration().getInt(numThreadKey, DEFAULT_NUM_THREADS);
|
||||||
|
|
||||||
|
ArrayList<Worker> workers = new ArrayList<>();
|
||||||
|
for (int i = 0; i < numThreads; i++) {
|
||||||
|
checkException(workers);
|
||||||
|
Worker worker = new Worker();
|
||||||
|
LOG.info("Launching worker thread " + worker.getName());
|
||||||
|
workers.add(worker);
|
||||||
|
worker.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
Threads.sleep(runtime / 2);
|
||||||
|
LOG.info("Stopping creating new tables");
|
||||||
|
create_table.set(false);
|
||||||
|
Threads.sleep(runtime / 2);
|
||||||
|
LOG.info("Runtime is up");
|
||||||
|
running.set(false);
|
||||||
|
|
||||||
|
checkException(workers);
|
||||||
|
|
||||||
|
for (Worker worker : workers) {
|
||||||
|
worker.join();
|
||||||
|
}
|
||||||
|
LOG.info("All Worker threads stopped");
|
||||||
|
|
||||||
|
// verify
|
||||||
|
LOG.info("Verify actions of all threads succeeded");
|
||||||
|
checkException(workers);
|
||||||
|
LOG.info("Verify states of all tables");
|
||||||
|
verifyTables();
|
||||||
|
|
||||||
|
// RUN HBCK
|
||||||
|
|
||||||
|
HBaseFsck hbck = null;
|
||||||
|
try {
|
||||||
|
LOG.info("Running hbck");
|
||||||
|
hbck = HbckTestingUtil.doFsck(util.getConfiguration(), false);
|
||||||
|
HbckTestingUtil.assertNoErrors(hbck);
|
||||||
|
LOG.info("Finished hbck");
|
||||||
|
} finally {
|
||||||
|
if (hbck != null) {
|
||||||
|
hbck.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableName getTablename() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Set<String> getColumnFamilies() {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) throws Exception {
|
||||||
|
Configuration conf = HBaseConfiguration.create();
|
||||||
|
IntegrationTestingUtility.setUseDistributedCluster(conf);
|
||||||
|
IntegrationTestDDLMasterFailover masterFailover = new IntegrationTestDDLMasterFailover();
|
||||||
|
Connection connection = null;
|
||||||
|
int ret = 1;
|
||||||
|
try {
|
||||||
|
// Initialize connection once, then pass to Actions
|
||||||
|
LOG.debug("Setting up connection ...");
|
||||||
|
connection = ConnectionFactory.createConnection(conf);
|
||||||
|
masterFailover.setConnection(connection);
|
||||||
|
ret = ToolRunner.run(conf, masterFailover, args);
|
||||||
|
} catch (IOException e){
|
||||||
|
LOG.fatal("Failed to establish connection. Aborting test ...", e);
|
||||||
|
} finally {
|
||||||
|
connection = masterFailover.getConnection();
|
||||||
|
if (connection != null){
|
||||||
|
connection.close();
|
||||||
|
}
|
||||||
|
System.exit(ret);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,71 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.chaos.factories;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.chaos.actions.Action;
|
||||||
|
import org.apache.hadoop.hbase.chaos.actions.DumpClusterStatusAction;
|
||||||
|
import org.apache.hadoop.hbase.chaos.actions.RestartActiveMasterAction;
|
||||||
|
import org.apache.hadoop.hbase.chaos.monkies.ChaosMonkey;
|
||||||
|
import org.apache.hadoop.hbase.chaos.monkies.PolicyBasedChaosMonkey;
|
||||||
|
import org.apache.hadoop.hbase.chaos.policies.PeriodicRandomActionPolicy;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A chaos monkey to kill the active master periodically. Can be run in single master
|
||||||
|
* or multi master setup.
|
||||||
|
*/
|
||||||
|
public class MasterKillingMonkeyFactory extends MonkeyFactory {
|
||||||
|
|
||||||
|
private long action1Period;
|
||||||
|
private long action2Period;
|
||||||
|
|
||||||
|
private long restartActiveMasterSleepTime;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ChaosMonkey build() {
|
||||||
|
loadProperties();
|
||||||
|
|
||||||
|
// Destructive actions to mess things around.
|
||||||
|
Action[] actions1 = new Action[] {
|
||||||
|
new RestartActiveMasterAction(restartActiveMasterSleepTime),
|
||||||
|
};
|
||||||
|
|
||||||
|
// Action to log more info for debugging
|
||||||
|
Action[] actions2 = new Action[] {
|
||||||
|
new DumpClusterStatusAction()
|
||||||
|
};
|
||||||
|
|
||||||
|
return new PolicyBasedChaosMonkey(util,
|
||||||
|
new PeriodicRandomActionPolicy(action1Period, actions1),
|
||||||
|
new PeriodicRandomActionPolicy(action2Period, actions2));
|
||||||
|
}
|
||||||
|
|
||||||
|
private void loadProperties() {
|
||||||
|
|
||||||
|
action1Period = Long.parseLong(this.properties.getProperty(
|
||||||
|
MonkeyConstants.PERIODIC_ACTION1_PERIOD,
|
||||||
|
MonkeyConstants.DEFAULT_PERIODIC_ACTION1_PERIOD + ""));
|
||||||
|
action2Period = Long.parseLong(this.properties.getProperty(
|
||||||
|
MonkeyConstants.PERIODIC_ACTION2_PERIOD,
|
||||||
|
MonkeyConstants.DEFAULT_PERIODIC_ACTION2_PERIOD + ""));
|
||||||
|
restartActiveMasterSleepTime = Long.parseLong(this.properties.getProperty(
|
||||||
|
MonkeyConstants.RESTART_ACTIVE_MASTER_SLEEP_TIME,
|
||||||
|
MonkeyConstants.DEFAULT_RESTART_ACTIVE_MASTER_SLEEP_TIME + ""));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -69,6 +69,7 @@ public abstract class MonkeyFactory {
|
||||||
public static final String SERVER_KILLING = "serverKilling";
|
public static final String SERVER_KILLING = "serverKilling";
|
||||||
public static final String STRESS_AM = "stressAM";
|
public static final String STRESS_AM = "stressAM";
|
||||||
public static final String NO_KILL = "noKill";
|
public static final String NO_KILL = "noKill";
|
||||||
|
public static final String MASTER_KILLING = "masterKilling";
|
||||||
|
|
||||||
public static Map<String, MonkeyFactory> FACTORIES = ImmutableMap.<String,MonkeyFactory>builder()
|
public static Map<String, MonkeyFactory> FACTORIES = ImmutableMap.<String,MonkeyFactory>builder()
|
||||||
.put(CALM, new CalmMonkeyFactory())
|
.put(CALM, new CalmMonkeyFactory())
|
||||||
|
@ -77,6 +78,7 @@ public abstract class MonkeyFactory {
|
||||||
.put(SERVER_KILLING, new ServerKillingMonkeyFactory())
|
.put(SERVER_KILLING, new ServerKillingMonkeyFactory())
|
||||||
.put(STRESS_AM, new StressAssignmentManagerMonkeyFactory())
|
.put(STRESS_AM, new StressAssignmentManagerMonkeyFactory())
|
||||||
.put(NO_KILL, new NoKillMonkeyFactory())
|
.put(NO_KILL, new NoKillMonkeyFactory())
|
||||||
|
.put(MASTER_KILLING, new MasterKillingMonkeyFactory())
|
||||||
.build();
|
.build();
|
||||||
|
|
||||||
public static MonkeyFactory getFactory(String factoryName) {
|
public static MonkeyFactory getFactory(String factoryName) {
|
||||||
|
|
Loading…
Reference in New Issue