HBASE-21487 Concurrent modify table ops can lead to unexpected results

Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
Syeda 2019-02-27 19:22:43 +05:30 committed by Guanghao Zhang
parent cbdbe6572b
commit 030b4d12fb
5 changed files with 283 additions and 17 deletions

View File

@ -0,0 +1,57 @@
/**
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.yetus.audience.InterfaceAudience;
/**
* Thrown when a table has been modified concurrently
*/
@InterfaceAudience.Public
public class ConcurrentTableModificationException extends DoNotRetryIOException {
private static final long serialVersionUID = 7453646730058600581L;
/** default constructor */
public ConcurrentTableModificationException() {
super();
}
/**
* Constructor
* @param s message
*/
public ConcurrentTableModificationException(String s) {
super(s);
}
/**
* @param tableName Name of table that is modified concurrently
*/
public ConcurrentTableModificationException(byte[] tableName) {
this(Bytes.toString(tableName));
}
/**
* @param tableName Name of table that is modified concurrently
*/
public ConcurrentTableModificationException(TableName tableName) {
this(tableName.getNameAsString());
}
}

View File

@ -78,6 +78,7 @@ message ModifyTableStateData {
optional TableSchema unmodified_table_schema = 2;
required TableSchema modified_table_schema = 3;
required bool delete_column_family_in_modify = 4;
optional bool should_check_descriptor = 5;
}
enum TruncateTableState {

View File

@ -2506,7 +2506,7 @@ public class HMaster extends HRegionServer implements MasterServices {
return TableDescriptorBuilder.newBuilder(old).setColumnFamily(column).build();
}
}, nonceGroup, nonce);
}, nonceGroup, nonce, true);
}
/**
@ -2533,7 +2533,7 @@ public class HMaster extends HRegionServer implements MasterServices {
return TableDescriptorBuilder.newBuilder(old).modifyColumnFamily(descriptor).build();
}
}, nonceGroup, nonce);
}, nonceGroup, nonce, true);
}
@Override
@ -2558,7 +2558,7 @@ public class HMaster extends HRegionServer implements MasterServices {
}
return TableDescriptorBuilder.newBuilder(old).removeColumnFamily(columnName).build();
}
}, nonceGroup, nonce);
}, nonceGroup, nonce, true);
}
@Override
@ -2651,8 +2651,8 @@ public class HMaster extends HRegionServer implements MasterServices {
}
private long modifyTable(final TableName tableName,
final TableDescriptorGetter newDescriptorGetter, final long nonceGroup, final long nonce)
throws IOException {
final TableDescriptorGetter newDescriptorGetter, final long nonceGroup, final long nonce,
final boolean shouldCheckDescriptor) throws IOException {
return MasterProcedureUtil
.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
@Override
@ -2670,8 +2670,8 @@ public class HMaster extends HRegionServer implements MasterServices {
// We need to wait for the procedure to potentially fail due to "prepare" sanity
// checks. This will block only the beginning of the procedure. See HBASE-19953.
ProcedurePrepareLatch latch = ProcedurePrepareLatch.createBlockingLatch();
submitProcedure(
new ModifyTableProcedure(procedureExecutor.getEnvironment(), newDescriptor, latch));
submitProcedure(new ModifyTableProcedure(procedureExecutor.getEnvironment(),
newDescriptor, latch, oldDescriptor, shouldCheckDescriptor));
latch.await();
getMaster().getMasterCoprocessorHost().postModifyTable(tableName, oldDescriptor,
@ -2695,7 +2695,7 @@ public class HMaster extends HRegionServer implements MasterServices {
public TableDescriptor get() throws IOException {
return newDescriptor;
}
}, nonceGroup, nonce);
}, nonceGroup, nonce, false);
}

View File

@ -22,6 +22,8 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import org.apache.hadoop.hbase.ConcurrentTableModificationException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseIOException;
import org.apache.hadoop.hbase.HConstants;
@ -56,10 +58,11 @@ public class ModifyTableProcedure
private TableDescriptor unmodifiedTableDescriptor = null;
private TableDescriptor modifiedTableDescriptor;
private boolean deleteColumnFamilyInModify;
private boolean shouldCheckDescriptor;
public ModifyTableProcedure() {
super();
initilize();
initilize(null, false);
}
public ModifyTableProcedure(final MasterProcedureEnv env, final TableDescriptor htd)
@ -70,14 +73,23 @@ public class ModifyTableProcedure
public ModifyTableProcedure(final MasterProcedureEnv env, final TableDescriptor htd,
final ProcedurePrepareLatch latch)
throws HBaseIOException {
this(env, htd, latch, null, false);
}
public ModifyTableProcedure(final MasterProcedureEnv env,
final TableDescriptor newTableDescriptor, final ProcedurePrepareLatch latch,
final TableDescriptor oldTableDescriptor, final boolean shouldCheckDescriptor)
throws HBaseIOException {
super(env, latch);
initilize();
this.modifiedTableDescriptor = htd;
initilize(oldTableDescriptor, shouldCheckDescriptor);
this.modifiedTableDescriptor = newTableDescriptor;
preflightChecks(env, null/*No table checks; if changing peers, table can be online*/);
}
private void initilize() {
this.unmodifiedTableDescriptor = null;
private void initilize(final TableDescriptor unmodifiedTableDescriptor,
final boolean shouldCheckDescriptor) {
this.unmodifiedTableDescriptor = unmodifiedTableDescriptor;
this.shouldCheckDescriptor = shouldCheckDescriptor;
this.deleteColumnFamilyInModify = false;
}
@ -188,7 +200,8 @@ public class ModifyTableProcedure
MasterProcedureProtos.ModifyTableStateData.newBuilder()
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(getUser()))
.setModifiedTableSchema(ProtobufUtil.toTableSchema(modifiedTableDescriptor))
.setDeleteColumnFamilyInModify(deleteColumnFamilyInModify);
.setDeleteColumnFamilyInModify(deleteColumnFamilyInModify)
.setShouldCheckDescriptor(shouldCheckDescriptor);
if (unmodifiedTableDescriptor != null) {
modifyTableMsg
@ -208,6 +221,8 @@ public class ModifyTableProcedure
setUser(MasterProcedureUtil.toUserInfo(modifyTableMsg.getUserInfo()));
modifiedTableDescriptor = ProtobufUtil.toTableDescriptor(modifyTableMsg.getModifiedTableSchema());
deleteColumnFamilyInModify = modifyTableMsg.getDeleteColumnFamilyInModify();
shouldCheckDescriptor = modifyTableMsg.hasShouldCheckDescriptor()
? modifyTableMsg.getShouldCheckDescriptor() : false;
if (modifyTableMsg.hasUnmodifiedTableSchema()) {
unmodifiedTableDescriptor =
@ -242,9 +257,24 @@ public class ModifyTableProcedure
" should have at least one column family.");
}
// In order to update the descriptor, we need to retrieve the old descriptor for comparison.
// If descriptor check is enabled, check whether the table descriptor when procedure was
// submitted matches with the current
// table descriptor of the table, else retrieve the old descriptor
// for comparison in order to update the descriptor.
if (shouldCheckDescriptor) {
if (TableDescriptor.COMPARATOR.compare(unmodifiedTableDescriptor,
env.getMasterServices().getTableDescriptors().get(getTableName())) != 0) {
LOG.error("Error while modifying table '" + getTableName().toString()
+ "' Skipping procedure : " + this);
throw new ConcurrentTableModificationException(
"Skipping modify table operation on table '" + getTableName().toString()
+ "' as it has already been modified by some other concurrent operation, "
+ "Please retry.");
}
} else {
this.unmodifiedTableDescriptor =
env.getMasterServices().getTableDescriptors().get(getTableName());
}
if (env.getMasterServices().getTableStateManager()
.isTableState(getTableName(), TableState.State.ENABLED)) {

View File

@ -22,12 +22,15 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.hadoop.hbase.ConcurrentTableModificationException;
import org.apache.hadoop.hbase.DoNotRetryIOException;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptor;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
import org.apache.hadoop.hbase.client.PerClientRandomNonceGenerator;
import org.apache.hadoop.hbase.client.RegionInfo;
@ -57,6 +60,10 @@ public class TestModifyTableProcedure extends TestTableDDLProcedureBase {
@Rule public TestName name = new TestName();
private static final String column_Family1 = "cf1";
private static final String column_Family2 = "cf2";
private static final String column_Family3 = "cf3";
@Test
public void testModifyTable() throws Exception {
final TableName tableName = TableName.valueOf(name.getMethodName());
@ -398,4 +405,175 @@ public class TestModifyTableProcedure extends TestTableDDLProcedureBase {
MasterProcedureTestingUtility.validateTableCreation(UTIL.getHBaseCluster().getMaster(),
tableName, regions, "cf1");
}
@Test
public void testConcurrentAddColumnFamily() throws IOException, InterruptedException {
final TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.createTable(tableName, column_Family1);
class ConcurrentAddColumnFamily extends Thread {
TableName tableName = null;
HColumnDescriptor hcd = null;
boolean exception;
public ConcurrentAddColumnFamily(TableName tableName, HColumnDescriptor hcd) {
this.tableName = tableName;
this.hcd = hcd;
this.exception = false;
}
public void run() {
try {
UTIL.getAdmin().addColumnFamily(tableName, hcd);
} catch (Exception e) {
if (e.getClass().equals(ConcurrentTableModificationException.class)) {
this.exception = true;
}
}
}
}
ConcurrentAddColumnFamily t1 =
new ConcurrentAddColumnFamily(tableName, new HColumnDescriptor(column_Family2));
ConcurrentAddColumnFamily t2 =
new ConcurrentAddColumnFamily(tableName, new HColumnDescriptor(column_Family3));
t1.start();
t2.start();
t1.join();
t2.join();
int noOfColumnFamilies = UTIL.getAdmin().getDescriptor(tableName).getColumnFamilies().length;
assertTrue("Expected ConcurrentTableModificationException.",
((t1.exception || t2.exception) && noOfColumnFamilies == 2) || noOfColumnFamilies == 3);
}
@Test
public void testConcurrentDeleteColumnFamily() throws IOException, InterruptedException {
final TableName tableName = TableName.valueOf(name.getMethodName());
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(column_Family1));
htd.addFamily(new HColumnDescriptor(column_Family2));
htd.addFamily(new HColumnDescriptor(column_Family3));
UTIL.getAdmin().createTable(htd);
class ConcurrentCreateDeleteTable extends Thread {
TableName tableName = null;
String columnFamily = null;
boolean exception;
public ConcurrentCreateDeleteTable(TableName tableName, String columnFamily) {
this.tableName = tableName;
this.columnFamily = columnFamily;
this.exception = false;
}
public void run() {
try {
UTIL.getAdmin().deleteColumnFamily(tableName, columnFamily.getBytes());
} catch (Exception e) {
if (e.getClass().equals(ConcurrentTableModificationException.class)) {
this.exception = true;
}
}
}
}
ConcurrentCreateDeleteTable t1 = new ConcurrentCreateDeleteTable(tableName, column_Family2);
ConcurrentCreateDeleteTable t2 = new ConcurrentCreateDeleteTable(tableName, column_Family3);
t1.start();
t2.start();
t1.join();
t2.join();
int noOfColumnFamilies = UTIL.getAdmin().getDescriptor(tableName).getColumnFamilies().length;
assertTrue("Expected ConcurrentTableModificationException.",
((t1.exception || t2.exception) && noOfColumnFamilies == 2) || noOfColumnFamilies == 1);
}
@Test
public void testConcurrentModifyColumnFamily() throws IOException, InterruptedException {
final TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.createTable(tableName, column_Family1);
class ConcurrentModifyColumnFamily extends Thread {
TableName tableName = null;
ColumnFamilyDescriptor hcd = null;
boolean exception;
public ConcurrentModifyColumnFamily(TableName tableName, ColumnFamilyDescriptor hcd) {
this.tableName = tableName;
this.hcd = hcd;
this.exception = false;
}
public void run() {
try {
UTIL.getAdmin().modifyColumnFamily(tableName, hcd);
} catch (Exception e) {
if (e.getClass().equals(ConcurrentTableModificationException.class)) {
this.exception = true;
}
}
}
}
ColumnFamilyDescriptor modColumnFamily1 = ColumnFamilyDescriptorBuilder
.newBuilder(column_Family1.getBytes()).setMaxVersions(5).build();
ColumnFamilyDescriptor modColumnFamily2 = ColumnFamilyDescriptorBuilder
.newBuilder(column_Family1.getBytes()).setMaxVersions(6).build();
ConcurrentModifyColumnFamily t1 = new ConcurrentModifyColumnFamily(tableName, modColumnFamily1);
ConcurrentModifyColumnFamily t2 = new ConcurrentModifyColumnFamily(tableName, modColumnFamily2);
t1.start();
t2.start();
t1.join();
t2.join();
int maxVersions = UTIL.getAdmin().getDescriptor(tableName)
.getColumnFamily(column_Family1.getBytes()).getMaxVersions();
assertTrue("Expected ConcurrentTableModificationException.", (t1.exception && maxVersions == 5)
|| (t2.exception && maxVersions == 6) || !(t1.exception && t2.exception));
}
@Test
public void testConcurrentModifyTable() throws IOException, InterruptedException {
final TableName tableName = TableName.valueOf(name.getMethodName());
UTIL.createTable(tableName, column_Family1);
class ConcurrentModifyTable extends Thread {
TableName tableName = null;
TableDescriptor htd = null;
boolean exception;
public ConcurrentModifyTable(TableName tableName, TableDescriptor htd) {
this.tableName = tableName;
this.htd = htd;
this.exception = false;
}
public void run() {
try {
UTIL.getAdmin().modifyTable(tableName, htd);
} catch (Exception e) {
if (e.getClass().equals(ConcurrentTableModificationException.class)) {
this.exception = true;
}
}
}
}
TableDescriptor htd = UTIL.getAdmin().getDescriptor(tableName);
TableDescriptor modifiedDescriptor =
TableDescriptorBuilder.newBuilder(htd).setCompactionEnabled(false).build();
ConcurrentModifyTable t1 = new ConcurrentModifyTable(tableName, modifiedDescriptor);
ConcurrentModifyTable t2 = new ConcurrentModifyTable(tableName, modifiedDescriptor);
t1.start();
t2.start();
t1.join();
t2.join();
assertFalse("Expected ConcurrentTableModificationException.", (t1.exception || t2.exception));
}
}