HBASE-13210 Procedure V2 - master Modify table (Stephen Yuan Jiang)
This commit is contained in:
parent
67149d253b
commit
c967595acb
File diff suppressed because it is too large
Load Diff
|
@ -58,6 +58,23 @@ message CreateTableStateData {
|
||||||
repeated RegionInfo region_info = 3;
|
repeated RegionInfo region_info = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
enum ModifyTableState {
|
||||||
|
MODIFY_TABLE_PREPARE = 1;
|
||||||
|
MODIFY_TABLE_PRE_OPERATION = 2;
|
||||||
|
MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR = 3;
|
||||||
|
MODIFY_TABLE_REMOVE_REPLICA_COLUMN = 4;
|
||||||
|
MODIFY_TABLE_DELETE_FS_LAYOUT = 5;
|
||||||
|
MODIFY_TABLE_POST_OPERATION = 6;
|
||||||
|
MODIFY_TABLE_REOPEN_ALL_REGIONS = 7;
|
||||||
|
}
|
||||||
|
|
||||||
|
message ModifyTableStateData {
|
||||||
|
required UserInformation user_info = 1;
|
||||||
|
optional TableSchema unmodified_table_schema = 2;
|
||||||
|
required TableSchema modified_table_schema = 3;
|
||||||
|
required bool delete_column_family_in_modify = 4;
|
||||||
|
}
|
||||||
|
|
||||||
enum DeleteTableState {
|
enum DeleteTableState {
|
||||||
DELETE_TABLE_PRE_OPERATION = 1;
|
DELETE_TABLE_PRE_OPERATION = 1;
|
||||||
DELETE_TABLE_REMOVE_FROM_META = 2;
|
DELETE_TABLE_REMOVE_FROM_META = 2;
|
||||||
|
|
|
@ -93,7 +93,6 @@ import org.apache.hadoop.hbase.master.cleaner.LogCleaner;
|
||||||
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
|
import org.apache.hadoop.hbase.master.handler.DisableTableHandler;
|
||||||
import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
|
import org.apache.hadoop.hbase.master.handler.DispatchMergingRegionHandler;
|
||||||
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
|
import org.apache.hadoop.hbase.master.handler.EnableTableHandler;
|
||||||
import org.apache.hadoop.hbase.master.handler.ModifyTableHandler;
|
|
||||||
import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler;
|
import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler;
|
||||||
import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
|
import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
|
||||||
import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
|
import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
|
||||||
|
@ -102,15 +101,17 @@ import org.apache.hadoop.hbase.master.procedure.CreateTableProcedure;
|
||||||
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
|
import org.apache.hadoop.hbase.master.procedure.DeleteTableProcedure;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureConstants;
|
||||||
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.ModifyTableProcedure;
|
||||||
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
import org.apache.hadoop.hbase.master.procedure.ProcedurePrepareLatch;
|
||||||
|
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
|
||||||
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
import org.apache.hadoop.hbase.master.snapshot.SnapshotManager;
|
||||||
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
|
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||||
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
|
import org.apache.hadoop.hbase.procedure.MasterProcedureManagerHost;
|
||||||
|
import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
|
||||||
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
import org.apache.hadoop.hbase.procedure2.store.wal.WALProcedureStore;
|
||||||
import org.apache.hadoop.hbase.procedure.flush.MasterFlushTableProcedureManager;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
|
import org.apache.hadoop.hbase.protobuf.generated.HBaseProtos.RegionServerInfo;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
|
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos.SplitLogTask.RecoveryMode;
|
||||||
|
@ -1762,8 +1763,15 @@ public class HMaster extends HRegionServer implements MasterServices, Server {
|
||||||
if (cpHost != null) {
|
if (cpHost != null) {
|
||||||
cpHost.preModifyTable(tableName, descriptor);
|
cpHost.preModifyTable(tableName, descriptor);
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
|
LOG.info(getClientIdAuditPrefix() + " modify " + tableName);
|
||||||
new ModifyTableHandler(tableName, descriptor, this, this).prepare().process();
|
|
||||||
|
// Execute the operation synchronously - wait for the operation completes before continuing.
|
||||||
|
long procId = this.procedureExecutor.submitProcedure(
|
||||||
|
new ModifyTableProcedure(procedureExecutor.getEnvironment(), descriptor));
|
||||||
|
|
||||||
|
ProcedureSyncWait.waitForProcedureToComplete(procedureExecutor, procId);
|
||||||
|
|
||||||
if (cpHost != null) {
|
if (cpHost != null) {
|
||||||
cpHost.postModifyTable(tableName, descriptor);
|
cpHost.postModifyTable(tableName, descriptor);
|
||||||
}
|
}
|
||||||
|
|
|
@ -580,10 +580,12 @@ public class MasterFileSystem {
|
||||||
Path familyDir = new Path(tableDir,
|
Path familyDir = new Path(tableDir,
|
||||||
new Path(region.getEncodedName(), Bytes.toString(familyName)));
|
new Path(region.getEncodedName(), Bytes.toString(familyName)));
|
||||||
if (fs.delete(familyDir, true) == false) {
|
if (fs.delete(familyDir, true) == false) {
|
||||||
throw new IOException("Could not delete family "
|
if (fs.exists(familyDir)) {
|
||||||
+ Bytes.toString(familyName) + " from FileSystem for region "
|
throw new IOException("Could not delete family "
|
||||||
+ region.getRegionNameAsString() + "(" + region.getEncodedName()
|
+ Bytes.toString(familyName) + " from FileSystem for region "
|
||||||
+ ")");
|
+ region.getRegionNameAsString() + "(" + region.getEncodedName()
|
||||||
|
+ ")");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -0,0 +1,167 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.LinkedList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.NavigableMap;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.HRegionLocation;
|
||||||
|
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||||
|
import org.apache.hadoop.hbase.ServerName;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||||
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.RegionLocator;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||||
|
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||||
|
import org.apache.hadoop.hbase.master.BulkReOpen;
|
||||||
|
import org.apache.hadoop.hbase.master.MasterFileSystem;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Helper class for schema change procedures
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public final class MasterDDLOperationHelper {
|
||||||
|
private static final Log LOG = LogFactory.getLog(MasterDDLOperationHelper.class);
|
||||||
|
|
||||||
|
private MasterDDLOperationHelper() {}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether online schema change is allowed from config
|
||||||
|
**/
|
||||||
|
public static boolean isOnlineSchemaChangeAllowed(final MasterProcedureEnv env) {
|
||||||
|
return env.getMasterServices().getConfiguration()
|
||||||
|
.getBoolean("hbase.online.schema.update.enable", false);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check whether a table is modifiable - exists and either offline or online with config set
|
||||||
|
* @param env MasterProcedureEnv
|
||||||
|
* @param tableName name of the table
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
public static void checkTableModifiable(final MasterProcedureEnv env, final TableName tableName)
|
||||||
|
throws IOException {
|
||||||
|
// Checks whether the table exists
|
||||||
|
if (!MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), tableName)) {
|
||||||
|
throw new TableNotFoundException(tableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
// We only execute this procedure with table online if online schema change config is set.
|
||||||
|
if (!env.getMasterServices().getAssignmentManager().getTableStateManager()
|
||||||
|
.isTableState(tableName, ZooKeeperProtos.Table.State.DISABLED)
|
||||||
|
&& !MasterDDLOperationHelper.isOnlineSchemaChangeAllowed(env)) {
|
||||||
|
throw new TableNotDisabledException(tableName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove the column family from the file system
|
||||||
|
**/
|
||||||
|
public static void deleteColumnFamilyFromFileSystem(
|
||||||
|
final MasterProcedureEnv env,
|
||||||
|
final TableName tableName,
|
||||||
|
List<HRegionInfo> regionInfoList,
|
||||||
|
final byte[] familyName) throws IOException {
|
||||||
|
final MasterFileSystem mfs = env.getMasterServices().getMasterFileSystem();
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Removing family=" + Bytes.toString(familyName) + " from table=" + tableName);
|
||||||
|
}
|
||||||
|
if (regionInfoList == null) {
|
||||||
|
regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, tableName);
|
||||||
|
}
|
||||||
|
for (HRegionInfo hri : regionInfoList) {
|
||||||
|
// Delete the family directory in FS for all the regions one by one
|
||||||
|
mfs.deleteFamilyFromFS(hri, familyName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Reopen all regions from a table after a schema change operation.
|
||||||
|
**/
|
||||||
|
public static boolean reOpenAllRegions(
|
||||||
|
final MasterProcedureEnv env,
|
||||||
|
final TableName tableName,
|
||||||
|
final List<HRegionInfo> regionInfoList) throws IOException {
|
||||||
|
boolean done = false;
|
||||||
|
LOG.info("Bucketing regions by region server...");
|
||||||
|
List<HRegionLocation> regionLocations = null;
|
||||||
|
Connection connection = env.getMasterServices().getConnection();
|
||||||
|
try (RegionLocator locator = connection.getRegionLocator(tableName)) {
|
||||||
|
regionLocations = locator.getAllRegionLocations();
|
||||||
|
}
|
||||||
|
// Convert List<HRegionLocation> to Map<HRegionInfo, ServerName>.
|
||||||
|
NavigableMap<HRegionInfo, ServerName> hri2Sn = new TreeMap<HRegionInfo, ServerName>();
|
||||||
|
for (HRegionLocation location : regionLocations) {
|
||||||
|
hri2Sn.put(location.getRegionInfo(), location.getServerName());
|
||||||
|
}
|
||||||
|
TreeMap<ServerName, List<HRegionInfo>> serverToRegions = Maps.newTreeMap();
|
||||||
|
List<HRegionInfo> reRegions = new ArrayList<HRegionInfo>();
|
||||||
|
for (HRegionInfo hri : regionInfoList) {
|
||||||
|
ServerName sn = hri2Sn.get(hri);
|
||||||
|
// Skip the offlined split parent region
|
||||||
|
// See HBASE-4578 for more information.
|
||||||
|
if (null == sn) {
|
||||||
|
LOG.info("Skip " + hri);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
if (!serverToRegions.containsKey(sn)) {
|
||||||
|
LinkedList<HRegionInfo> hriList = Lists.newLinkedList();
|
||||||
|
serverToRegions.put(sn, hriList);
|
||||||
|
}
|
||||||
|
reRegions.add(hri);
|
||||||
|
serverToRegions.get(sn).add(hri);
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Reopening " + reRegions.size() + " regions on " + serverToRegions.size()
|
||||||
|
+ " region servers.");
|
||||||
|
AssignmentManager am = env.getMasterServices().getAssignmentManager();
|
||||||
|
am.setRegionsToReopen(reRegions);
|
||||||
|
BulkReOpen bulkReopen = new BulkReOpen(env.getMasterServices(), serverToRegions, am);
|
||||||
|
while (true) {
|
||||||
|
try {
|
||||||
|
if (bulkReopen.bulkReOpen()) {
|
||||||
|
done = true;
|
||||||
|
break;
|
||||||
|
} else {
|
||||||
|
LOG.warn("Timeout before reopening all regions");
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
LOG.warn("Reopen was interrupted");
|
||||||
|
// Preserve the interrupt.
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return done;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,510 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InputStream;
|
||||||
|
import java.io.OutputStream;
|
||||||
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.MetaTableAccessor;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||||
|
import org.apache.hadoop.hbase.TableNotFoundException;
|
||||||
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
|
import org.apache.hadoop.hbase.client.Result;
|
||||||
|
import org.apache.hadoop.hbase.client.ResultScanner;
|
||||||
|
import org.apache.hadoop.hbase.client.Scan;
|
||||||
|
import org.apache.hadoop.hbase.client.Table;
|
||||||
|
import org.apache.hadoop.hbase.executor.EventType;
|
||||||
|
import org.apache.hadoop.hbase.master.MasterCoprocessorHost;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.StateMachineProcedure;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.ModifyTableState;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.ZooKeeperProtos;
|
||||||
|
import org.apache.hadoop.hbase.util.ServerRegionReplicaUtil;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class ModifyTableProcedure
|
||||||
|
extends StateMachineProcedure<MasterProcedureEnv, ModifyTableState>
|
||||||
|
implements TableProcedureInterface {
|
||||||
|
private static final Log LOG = LogFactory.getLog(ModifyTableProcedure.class);
|
||||||
|
|
||||||
|
private final AtomicBoolean aborted = new AtomicBoolean(false);
|
||||||
|
|
||||||
|
private HTableDescriptor unmodifiedHTableDescriptor = null;
|
||||||
|
private HTableDescriptor modifiedHTableDescriptor;
|
||||||
|
private UserGroupInformation user;
|
||||||
|
private boolean deleteColumnFamilyInModify;
|
||||||
|
|
||||||
|
private List<HRegionInfo> regionInfoList;
|
||||||
|
private Boolean traceEnabled = null;
|
||||||
|
|
||||||
|
public ModifyTableProcedure() {
|
||||||
|
initilize();
|
||||||
|
}
|
||||||
|
|
||||||
|
public ModifyTableProcedure(
|
||||||
|
final MasterProcedureEnv env,
|
||||||
|
final HTableDescriptor htd) throws IOException {
|
||||||
|
initilize();
|
||||||
|
this.modifiedHTableDescriptor = htd;
|
||||||
|
this.user = env.getRequestUser().getUGI();
|
||||||
|
}
|
||||||
|
|
||||||
|
private void initilize() {
|
||||||
|
this.unmodifiedHTableDescriptor = null;
|
||||||
|
this.regionInfoList = null;
|
||||||
|
this.traceEnabled = null;
|
||||||
|
this.deleteColumnFamilyInModify = false;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected Flow executeFromState(final MasterProcedureEnv env, final ModifyTableState state) {
|
||||||
|
if (isTraceEnabled()) {
|
||||||
|
LOG.trace(this + " execute state=" + state);
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
switch (state) {
|
||||||
|
case MODIFY_TABLE_PREPARE:
|
||||||
|
prepareModify(env);
|
||||||
|
setNextState(ModifyTableState.MODIFY_TABLE_PRE_OPERATION);
|
||||||
|
break;
|
||||||
|
case MODIFY_TABLE_PRE_OPERATION:
|
||||||
|
preModify(env, state);
|
||||||
|
setNextState(ModifyTableState.MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR);
|
||||||
|
break;
|
||||||
|
case MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR:
|
||||||
|
updateTableDescriptor(env);
|
||||||
|
setNextState(ModifyTableState.MODIFY_TABLE_REMOVE_REPLICA_COLUMN);
|
||||||
|
break;
|
||||||
|
case MODIFY_TABLE_REMOVE_REPLICA_COLUMN:
|
||||||
|
updateReplicaColumnsIfNeeded(env, unmodifiedHTableDescriptor, modifiedHTableDescriptor);
|
||||||
|
if (deleteColumnFamilyInModify) {
|
||||||
|
setNextState(ModifyTableState.MODIFY_TABLE_DELETE_FS_LAYOUT);
|
||||||
|
} else {
|
||||||
|
setNextState(ModifyTableState.MODIFY_TABLE_POST_OPERATION);
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
case MODIFY_TABLE_DELETE_FS_LAYOUT:
|
||||||
|
deleteFromFs(env, unmodifiedHTableDescriptor, modifiedHTableDescriptor);
|
||||||
|
setNextState(ModifyTableState.MODIFY_TABLE_POST_OPERATION);
|
||||||
|
break;
|
||||||
|
case MODIFY_TABLE_POST_OPERATION:
|
||||||
|
postModify(env, state);
|
||||||
|
setNextState(ModifyTableState.MODIFY_TABLE_REOPEN_ALL_REGIONS);
|
||||||
|
break;
|
||||||
|
case MODIFY_TABLE_REOPEN_ALL_REGIONS:
|
||||||
|
reOpenAllRegionsIfTableIsOnline(env);
|
||||||
|
return Flow.NO_MORE_STATE;
|
||||||
|
default:
|
||||||
|
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException|IOException e) {
|
||||||
|
if (!isRollbackSupported(state)) {
|
||||||
|
// We reach a state that cannot be rolled back. We just need to keep retry.
|
||||||
|
LOG.warn("Error trying to modify table=" + getTableName() + " state=" + state, e);
|
||||||
|
} else {
|
||||||
|
LOG.error("Error trying to modify table=" + getTableName() + " state=" + state, e);
|
||||||
|
setFailure("master-modify-table", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return Flow.HAS_MORE_STATE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void rollbackState(final MasterProcedureEnv env, final ModifyTableState state)
|
||||||
|
throws IOException {
|
||||||
|
if (isTraceEnabled()) {
|
||||||
|
LOG.trace(this + " rollback state=" + state);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
switch (state) {
|
||||||
|
case MODIFY_TABLE_REOPEN_ALL_REGIONS:
|
||||||
|
break; // Nothing to undo.
|
||||||
|
case MODIFY_TABLE_POST_OPERATION:
|
||||||
|
// TODO-MAYBE: call the coprocessor event to un-modify?
|
||||||
|
break;
|
||||||
|
case MODIFY_TABLE_DELETE_FS_LAYOUT:
|
||||||
|
// Once we reach to this state - we could NOT rollback - as it is tricky to undelete
|
||||||
|
// the deleted files. We are not suppose to reach here, throw exception so that we know
|
||||||
|
// there is a code bug to investigate.
|
||||||
|
assert deleteColumnFamilyInModify;
|
||||||
|
throw new UnsupportedOperationException(this + " rollback of state=" + state
|
||||||
|
+ " is unsupported.");
|
||||||
|
case MODIFY_TABLE_REMOVE_REPLICA_COLUMN:
|
||||||
|
// Undo the replica column update.
|
||||||
|
updateReplicaColumnsIfNeeded(env, modifiedHTableDescriptor, unmodifiedHTableDescriptor);
|
||||||
|
break;
|
||||||
|
case MODIFY_TABLE_UPDATE_TABLE_DESCRIPTOR:
|
||||||
|
restoreTableDescriptor(env);
|
||||||
|
break;
|
||||||
|
case MODIFY_TABLE_PRE_OPERATION:
|
||||||
|
// TODO-MAYBE: call the coprocessor event to un-modify?
|
||||||
|
break;
|
||||||
|
case MODIFY_TABLE_PREPARE:
|
||||||
|
break; // Nothing to undo.
|
||||||
|
default:
|
||||||
|
throw new UnsupportedOperationException("unhandled state=" + state);
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.warn("Fail trying to rollback modify table=" + getTableName() + " state=" + state, e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ModifyTableState getState(final int stateId) {
|
||||||
|
return ModifyTableState.valueOf(stateId);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected int getStateId(final ModifyTableState state) {
|
||||||
|
return state.getNumber();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected ModifyTableState getInitialState() {
|
||||||
|
return ModifyTableState.MODIFY_TABLE_PREPARE;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void setNextState(final ModifyTableState state) {
|
||||||
|
if (aborted.get() && isRollbackSupported(state)) {
|
||||||
|
setAbortFailure("modify-table", "abort requested");
|
||||||
|
} else {
|
||||||
|
super.setNextState(state);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean abort(final MasterProcedureEnv env) {
|
||||||
|
aborted.set(true);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected boolean acquireLock(final MasterProcedureEnv env) {
|
||||||
|
if (!env.isInitialized()) return false;
|
||||||
|
return env.getProcedureQueue().tryAcquireTableWrite(
|
||||||
|
getTableName(),
|
||||||
|
EventType.C_M_MODIFY_TABLE.toString());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void releaseLock(final MasterProcedureEnv env) {
|
||||||
|
env.getProcedureQueue().releaseTableWrite(getTableName());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void serializeStateData(final OutputStream stream) throws IOException {
|
||||||
|
super.serializeStateData(stream);
|
||||||
|
|
||||||
|
MasterProcedureProtos.ModifyTableStateData.Builder modifyTableMsg =
|
||||||
|
MasterProcedureProtos.ModifyTableStateData.newBuilder()
|
||||||
|
.setUserInfo(MasterProcedureUtil.toProtoUserInfo(user))
|
||||||
|
.setModifiedTableSchema(modifiedHTableDescriptor.convert())
|
||||||
|
.setDeleteColumnFamilyInModify(deleteColumnFamilyInModify);
|
||||||
|
|
||||||
|
if (unmodifiedHTableDescriptor != null) {
|
||||||
|
modifyTableMsg.setUnmodifiedTableSchema(unmodifiedHTableDescriptor.convert());
|
||||||
|
}
|
||||||
|
|
||||||
|
modifyTableMsg.build().writeDelimitedTo(stream);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void deserializeStateData(final InputStream stream) throws IOException {
|
||||||
|
super.deserializeStateData(stream);
|
||||||
|
|
||||||
|
MasterProcedureProtos.ModifyTableStateData modifyTableMsg =
|
||||||
|
MasterProcedureProtos.ModifyTableStateData.parseDelimitedFrom(stream);
|
||||||
|
user = MasterProcedureUtil.toUserInfo(modifyTableMsg.getUserInfo());
|
||||||
|
modifiedHTableDescriptor = HTableDescriptor.convert(modifyTableMsg.getModifiedTableSchema());
|
||||||
|
deleteColumnFamilyInModify = modifyTableMsg.getDeleteColumnFamilyInModify();
|
||||||
|
|
||||||
|
if (modifyTableMsg.hasUnmodifiedTableSchema()) {
|
||||||
|
unmodifiedHTableDescriptor =
|
||||||
|
HTableDescriptor.convert(modifyTableMsg.getUnmodifiedTableSchema());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void toStringClassDetails(StringBuilder sb) {
|
||||||
|
sb.append(getClass().getSimpleName());
|
||||||
|
sb.append(" (table=");
|
||||||
|
sb.append(getTableName());
|
||||||
|
sb.append(") user=");
|
||||||
|
sb.append(user);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableName getTableName() {
|
||||||
|
return modifiedHTableDescriptor.getTableName();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public TableOperationType getTableOperationType() {
|
||||||
|
return TableOperationType.EDIT;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check conditions before any real action of modifying a table.
|
||||||
|
* @param env MasterProcedureEnv
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void prepareModify(final MasterProcedureEnv env) throws IOException {
|
||||||
|
// Checks whether the table exists
|
||||||
|
if (!MetaTableAccessor.tableExists(env.getMasterServices().getConnection(), getTableName())) {
|
||||||
|
throw new TableNotFoundException(getTableName());
|
||||||
|
}
|
||||||
|
|
||||||
|
// In order to update the descriptor, we need to retrieve the old descriptor for comparison.
|
||||||
|
this.unmodifiedHTableDescriptor =
|
||||||
|
env.getMasterServices().getTableDescriptors().get(getTableName());
|
||||||
|
|
||||||
|
if (env.getMasterServices().getAssignmentManager().getTableStateManager()
|
||||||
|
.isTableState(getTableName(), ZooKeeperProtos.Table.State.ENABLED)) {
|
||||||
|
// We only execute this procedure with table online if online schema change config is set.
|
||||||
|
if (!MasterDDLOperationHelper.isOnlineSchemaChangeAllowed(env)) {
|
||||||
|
throw new TableNotDisabledException(getTableName());
|
||||||
|
}
|
||||||
|
|
||||||
|
if (modifiedHTableDescriptor.getRegionReplication() != unmodifiedHTableDescriptor
|
||||||
|
.getRegionReplication()) {
|
||||||
|
throw new IOException("REGION_REPLICATION change is not supported for enabled tables");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Find out whether all column families in unmodifiedHTableDescriptor also exists in
|
||||||
|
// the modifiedHTableDescriptor. This is to determine whether we are safe to rollback.
|
||||||
|
final Set<byte[]> oldFamilies = unmodifiedHTableDescriptor.getFamiliesKeys();
|
||||||
|
final Set<byte[]> newFamilies = modifiedHTableDescriptor.getFamiliesKeys();
|
||||||
|
for (byte[] familyName : oldFamilies) {
|
||||||
|
if (!newFamilies.contains(familyName)) {
|
||||||
|
this.deleteColumnFamilyInModify = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Action before modifying table.
|
||||||
|
* @param env MasterProcedureEnv
|
||||||
|
* @param state the procedure state
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
private void preModify(final MasterProcedureEnv env, final ModifyTableState state)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
runCoprocessorAction(env, state);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update descriptor
|
||||||
|
* @param env MasterProcedureEnv
|
||||||
|
* @throws IOException
|
||||||
|
**/
|
||||||
|
private void updateTableDescriptor(final MasterProcedureEnv env) throws IOException {
|
||||||
|
env.getMasterServices().getTableDescriptors().add(modifiedHTableDescriptor);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Undo the descriptor change (for rollback)
|
||||||
|
* @param env MasterProcedureEnv
|
||||||
|
* @throws IOException
|
||||||
|
**/
|
||||||
|
private void restoreTableDescriptor(final MasterProcedureEnv env) throws IOException {
|
||||||
|
env.getMasterServices().getTableDescriptors().add(unmodifiedHTableDescriptor);
|
||||||
|
|
||||||
|
// delete any new column families from the modifiedHTableDescriptor.
|
||||||
|
deleteFromFs(env, modifiedHTableDescriptor, unmodifiedHTableDescriptor);
|
||||||
|
|
||||||
|
// Make sure regions are opened after table descriptor is updated.
|
||||||
|
reOpenAllRegionsIfTableIsOnline(env);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes from hdfs the families that are not longer present in the new table descriptor.
|
||||||
|
* @param env MasterProcedureEnv
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void deleteFromFs(final MasterProcedureEnv env,
|
||||||
|
final HTableDescriptor oldHTableDescriptor, final HTableDescriptor newHTableDescriptor)
|
||||||
|
throws IOException {
|
||||||
|
final Set<byte[]> oldFamilies = oldHTableDescriptor.getFamiliesKeys();
|
||||||
|
final Set<byte[]> newFamilies = newHTableDescriptor.getFamiliesKeys();
|
||||||
|
for (byte[] familyName : oldFamilies) {
|
||||||
|
if (!newFamilies.contains(familyName)) {
|
||||||
|
MasterDDLOperationHelper.deleteColumnFamilyFromFileSystem(
|
||||||
|
env,
|
||||||
|
getTableName(),
|
||||||
|
getRegionInfoList(env),
|
||||||
|
familyName);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* update replica column families if necessary.
|
||||||
|
* @param env MasterProcedureEnv
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void updateReplicaColumnsIfNeeded(
|
||||||
|
final MasterProcedureEnv env,
|
||||||
|
final HTableDescriptor oldHTableDescriptor,
|
||||||
|
final HTableDescriptor newHTableDescriptor) throws IOException {
|
||||||
|
final int oldReplicaCount = oldHTableDescriptor.getRegionReplication();
|
||||||
|
final int newReplicaCount = newHTableDescriptor.getRegionReplication();
|
||||||
|
|
||||||
|
if (newReplicaCount < oldReplicaCount) {
|
||||||
|
Set<byte[]> tableRows = new HashSet<byte[]>();
|
||||||
|
Connection connection = env.getMasterServices().getConnection();
|
||||||
|
Scan scan = MetaTableAccessor.getScanForTableName(getTableName());
|
||||||
|
scan.addColumn(HConstants.CATALOG_FAMILY, HConstants.REGIONINFO_QUALIFIER);
|
||||||
|
|
||||||
|
try (Table metaTable = connection.getTable(TableName.META_TABLE_NAME)) {
|
||||||
|
ResultScanner resScanner = metaTable.getScanner(scan);
|
||||||
|
for (Result result : resScanner) {
|
||||||
|
tableRows.add(result.getRow());
|
||||||
|
}
|
||||||
|
MetaTableAccessor.removeRegionReplicasFromMeta(
|
||||||
|
tableRows,
|
||||||
|
newReplicaCount,
|
||||||
|
oldReplicaCount - newReplicaCount,
|
||||||
|
connection);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Setup replication for region replicas if needed
|
||||||
|
if (newReplicaCount > 1 && oldReplicaCount <= 1) {
|
||||||
|
ServerRegionReplicaUtil.setupRegionReplicaReplication(env.getMasterConfiguration());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Action after modifying table.
|
||||||
|
* @param env MasterProcedureEnv
|
||||||
|
* @param state the procedure state
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
private void postModify(final MasterProcedureEnv env, final ModifyTableState state)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
runCoprocessorAction(env, state);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Last action from the procedure - executed when online schema change is supported.
|
||||||
|
* @param env MasterProcedureEnv
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private void reOpenAllRegionsIfTableIsOnline(final MasterProcedureEnv env) throws IOException {
|
||||||
|
// This operation only run when the table is enabled.
|
||||||
|
if (!env.getMasterServices().getAssignmentManager().getTableStateManager()
|
||||||
|
.isTableState(getTableName(), ZooKeeperProtos.Table.State.ENABLED)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (MasterDDLOperationHelper.reOpenAllRegions(env, getTableName(), getRegionInfoList(env))) {
|
||||||
|
LOG.info("Completed modify table operation on table " + getTableName());
|
||||||
|
} else {
|
||||||
|
LOG.warn("Error on reopening the regions on table " + getTableName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The procedure could be restarted from a different machine. If the variable is null, we need to
|
||||||
|
* retrieve it.
|
||||||
|
* @return traceEnabled whether the trace is enabled
|
||||||
|
*/
|
||||||
|
private Boolean isTraceEnabled() {
|
||||||
|
if (traceEnabled == null) {
|
||||||
|
traceEnabled = LOG.isTraceEnabled();
|
||||||
|
}
|
||||||
|
return traceEnabled;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Coprocessor Action.
|
||||||
|
* @param env MasterProcedureEnv
|
||||||
|
* @param state the procedure state
|
||||||
|
* @throws IOException
|
||||||
|
* @throws InterruptedException
|
||||||
|
*/
|
||||||
|
private void runCoprocessorAction(final MasterProcedureEnv env, final ModifyTableState state)
|
||||||
|
throws IOException, InterruptedException {
|
||||||
|
final MasterCoprocessorHost cpHost = env.getMasterCoprocessorHost();
|
||||||
|
if (cpHost != null) {
|
||||||
|
user.doAs(new PrivilegedExceptionAction<Void>() {
|
||||||
|
@Override
|
||||||
|
public Void run() throws Exception {
|
||||||
|
switch (state) {
|
||||||
|
case MODIFY_TABLE_PRE_OPERATION:
|
||||||
|
cpHost.preModifyTableHandler(getTableName(), modifiedHTableDescriptor);
|
||||||
|
break;
|
||||||
|
case MODIFY_TABLE_POST_OPERATION:
|
||||||
|
cpHost.postModifyTableHandler(getTableName(), modifiedHTableDescriptor);
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
throw new UnsupportedOperationException(this + " unhandled state=" + state);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/*
|
||||||
|
* Check whether we are in the state that can be rollback
|
||||||
|
*/
|
||||||
|
private boolean isRollbackSupported(final ModifyTableState state) {
|
||||||
|
if (deleteColumnFamilyInModify) {
|
||||||
|
switch (state) {
|
||||||
|
case MODIFY_TABLE_DELETE_FS_LAYOUT:
|
||||||
|
case MODIFY_TABLE_POST_OPERATION:
|
||||||
|
case MODIFY_TABLE_REOPEN_ALL_REGIONS:
|
||||||
|
// It is not safe to rollback if we reach to these states.
|
||||||
|
return false;
|
||||||
|
default:
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<HRegionInfo> getRegionInfoList(final MasterProcedureEnv env) throws IOException {
|
||||||
|
if (regionInfoList == null) {
|
||||||
|
regionInfoList = ProcedureSyncWait.getRegionsFromMeta(env, getTableName());
|
||||||
|
}
|
||||||
|
return regionInfoList;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,402 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.master.procedure;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.HBaseTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.HRegionInfo;
|
||||||
|
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureExecutor;
|
||||||
|
import org.apache.hadoop.hbase.procedure2.ProcedureTestingUtility;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.MasterProcedureProtos.ModifyTableState;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.junit.After;
|
||||||
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestModifyTableProcedure {
|
||||||
|
private static final Log LOG = LogFactory.getLog(TestModifyTableProcedure.class);
|
||||||
|
|
||||||
|
protected static final HBaseTestingUtility UTIL = new HBaseTestingUtility();
|
||||||
|
|
||||||
|
private static void setupConf(Configuration conf) {
|
||||||
|
conf.setInt(MasterProcedureConstants.MASTER_PROCEDURE_THREADS, 1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupCluster() throws Exception {
|
||||||
|
setupConf(UTIL.getConfiguration());
|
||||||
|
UTIL.startMiniCluster(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterClass
|
||||||
|
public static void cleanupTest() throws Exception {
|
||||||
|
try {
|
||||||
|
UTIL.shutdownMiniCluster();
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.warn("failure shutting down cluster", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setup() throws Exception {
|
||||||
|
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false);
|
||||||
|
}
|
||||||
|
|
||||||
|
@After
|
||||||
|
public void tearDown() throws Exception {
|
||||||
|
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(getMasterProcedureExecutor(), false);
|
||||||
|
for (HTableDescriptor htd: UTIL.getHBaseAdmin().listTables()) {
|
||||||
|
LOG.info("Tear down, remove table=" + htd.getTableName());
|
||||||
|
UTIL.deleteTable(htd.getTableName());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testModifyTable() throws Exception {
|
||||||
|
final TableName tableName = TableName.valueOf("testModifyTable");
|
||||||
|
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||||
|
|
||||||
|
MasterProcedureTestingUtility.createTable(procExec, tableName, null, "cf");
|
||||||
|
UTIL.getHBaseAdmin().disableTable(tableName);
|
||||||
|
|
||||||
|
// Modify the table descriptor
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName));
|
||||||
|
|
||||||
|
// Test 1: Modify 1 property
|
||||||
|
long newMaxFileSize = htd.getMaxFileSize() * 2;
|
||||||
|
htd.setMaxFileSize(newMaxFileSize);
|
||||||
|
htd.setRegionReplication(3);
|
||||||
|
|
||||||
|
long procId1 = ProcedureTestingUtility.submitAndWait(
|
||||||
|
procExec, new ModifyTableProcedure(procExec.getEnvironment(), htd));
|
||||||
|
ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId1));
|
||||||
|
|
||||||
|
HTableDescriptor currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName);
|
||||||
|
assertEquals(newMaxFileSize, currentHtd.getMaxFileSize());
|
||||||
|
|
||||||
|
// Test 2: Modify multiple properties
|
||||||
|
boolean newReadOnlyOption = htd.isReadOnly() ? false : true;
|
||||||
|
long newMemStoreFlushSize = htd.getMemStoreFlushSize() * 2;
|
||||||
|
htd.setReadOnly(newReadOnlyOption);
|
||||||
|
htd.setMemStoreFlushSize(newMemStoreFlushSize);
|
||||||
|
|
||||||
|
long procId2 = ProcedureTestingUtility.submitAndWait(
|
||||||
|
procExec, new ModifyTableProcedure(procExec.getEnvironment(), htd));
|
||||||
|
ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId2));
|
||||||
|
|
||||||
|
currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName);
|
||||||
|
assertEquals(newReadOnlyOption, currentHtd.isReadOnly());
|
||||||
|
assertEquals(newMemStoreFlushSize, currentHtd.getMemStoreFlushSize());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testModifyTableAddCF() throws Exception {
|
||||||
|
final TableName tableName = TableName.valueOf("testModifyTableAddCF");
|
||||||
|
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||||
|
|
||||||
|
MasterProcedureTestingUtility.createTable(procExec, tableName, null, "cf1");
|
||||||
|
HTableDescriptor currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName);
|
||||||
|
assertEquals(1, currentHtd.getFamiliesKeys().size());
|
||||||
|
|
||||||
|
// Test 1: Modify the table descriptor online
|
||||||
|
String cf2 = "cf2";
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName));
|
||||||
|
htd.addFamily(new HColumnDescriptor(cf2));
|
||||||
|
|
||||||
|
long procId = ProcedureTestingUtility.submitAndWait(
|
||||||
|
procExec, new ModifyTableProcedure(procExec.getEnvironment(), htd));
|
||||||
|
ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId));
|
||||||
|
|
||||||
|
currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName);
|
||||||
|
assertEquals(2, currentHtd.getFamiliesKeys().size());
|
||||||
|
assertTrue(currentHtd.hasFamily(cf2.getBytes()));
|
||||||
|
|
||||||
|
// Test 2: Modify the table descriptor offline
|
||||||
|
UTIL.getHBaseAdmin().disableTable(tableName);
|
||||||
|
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
|
||||||
|
String cf3 = "cf3";
|
||||||
|
HTableDescriptor htd2 =
|
||||||
|
new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName));
|
||||||
|
htd2.addFamily(new HColumnDescriptor(cf3));
|
||||||
|
|
||||||
|
long procId2 =
|
||||||
|
ProcedureTestingUtility.submitAndWait(procExec,
|
||||||
|
new ModifyTableProcedure(procExec.getEnvironment(), htd2));
|
||||||
|
ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId2));
|
||||||
|
|
||||||
|
currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName);
|
||||||
|
assertTrue(currentHtd.hasFamily(cf3.getBytes()));
|
||||||
|
assertEquals(3, currentHtd.getFamiliesKeys().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testModifyTableDeleteCF() throws Exception {
|
||||||
|
final TableName tableName = TableName.valueOf("testModifyTableAddCF");
|
||||||
|
final String cf2 = "cf2";
|
||||||
|
final String cf3 = "cf3";
|
||||||
|
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||||
|
|
||||||
|
MasterProcedureTestingUtility.createTable(procExec, tableName, null, "cf1", cf2, cf3);
|
||||||
|
HTableDescriptor currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName);
|
||||||
|
assertEquals(3, currentHtd.getFamiliesKeys().size());
|
||||||
|
|
||||||
|
// Test 1: Modify the table descriptor
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName));
|
||||||
|
htd.removeFamily(cf2.getBytes());
|
||||||
|
|
||||||
|
long procId = ProcedureTestingUtility.submitAndWait(
|
||||||
|
procExec, new ModifyTableProcedure(procExec.getEnvironment(), htd));
|
||||||
|
ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId));
|
||||||
|
|
||||||
|
currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName);
|
||||||
|
assertEquals(2, currentHtd.getFamiliesKeys().size());
|
||||||
|
assertFalse(currentHtd.hasFamily(cf2.getBytes()));
|
||||||
|
|
||||||
|
// Test 2: Modify the table descriptor offline
|
||||||
|
UTIL.getHBaseAdmin().disableTable(tableName);
|
||||||
|
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
|
||||||
|
|
||||||
|
HTableDescriptor htd2 =
|
||||||
|
new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName));
|
||||||
|
htd2.removeFamily(cf3.getBytes());
|
||||||
|
|
||||||
|
long procId2 =
|
||||||
|
ProcedureTestingUtility.submitAndWait(procExec,
|
||||||
|
new ModifyTableProcedure(procExec.getEnvironment(), htd2));
|
||||||
|
ProcedureTestingUtility.assertProcNotFailed(procExec.getResult(procId2));
|
||||||
|
|
||||||
|
currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName);
|
||||||
|
assertEquals(1, currentHtd.getFamiliesKeys().size());
|
||||||
|
assertFalse(currentHtd.hasFamily(cf3.getBytes()));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout=60000)
|
||||||
|
public void testRecoveryAndDoubleExecutionOffline() throws Exception {
|
||||||
|
final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecutionOffline");
|
||||||
|
final String cf2 = "cf2";
|
||||||
|
final String cf3 = "cf3";
|
||||||
|
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||||
|
|
||||||
|
// create the table
|
||||||
|
HRegionInfo[] regions = MasterProcedureTestingUtility.createTable(
|
||||||
|
procExec, tableName, null, "cf1", cf3);
|
||||||
|
UTIL.getHBaseAdmin().disableTable(tableName);
|
||||||
|
|
||||||
|
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
|
||||||
|
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
|
||||||
|
|
||||||
|
// Modify multiple properties of the table.
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName));
|
||||||
|
boolean newCompactionEnableOption = htd.isCompactionEnabled() ? false : true;
|
||||||
|
htd.setCompactionEnabled(newCompactionEnableOption);
|
||||||
|
htd.addFamily(new HColumnDescriptor(cf2));
|
||||||
|
htd.removeFamily(cf3.getBytes());
|
||||||
|
htd.setRegionReplication(3);
|
||||||
|
|
||||||
|
// Start the Modify procedure && kill the executor
|
||||||
|
long procId =
|
||||||
|
procExec.submitProcedure(new ModifyTableProcedure(procExec.getEnvironment(), htd));
|
||||||
|
|
||||||
|
// Restart the executor and execute the step twice
|
||||||
|
int numberOfSteps = ModifyTableState.values().length;
|
||||||
|
MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(
|
||||||
|
procExec,
|
||||||
|
procId,
|
||||||
|
numberOfSteps,
|
||||||
|
ModifyTableState.values());
|
||||||
|
|
||||||
|
// Validate descriptor
|
||||||
|
HTableDescriptor currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName);
|
||||||
|
assertEquals(newCompactionEnableOption, currentHtd.isCompactionEnabled());
|
||||||
|
assertEquals(2, currentHtd.getFamiliesKeys().size());
|
||||||
|
|
||||||
|
// cf2 should be added cf3 should be removed
|
||||||
|
MasterProcedureTestingUtility.validateTableCreation(UTIL.getHBaseCluster().getMaster(),
|
||||||
|
tableName, regions, false, "cf1", cf2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testRecoveryAndDoubleExecutionOnline() throws Exception {
|
||||||
|
final TableName tableName = TableName.valueOf("testRecoveryAndDoubleExecutionOnline");
|
||||||
|
final String cf2 = "cf2";
|
||||||
|
final String cf3 = "cf3";
|
||||||
|
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||||
|
|
||||||
|
// create the table
|
||||||
|
HRegionInfo[] regions = MasterProcedureTestingUtility.createTable(
|
||||||
|
procExec, tableName, null, "cf1", cf3);
|
||||||
|
|
||||||
|
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
|
||||||
|
|
||||||
|
// Modify multiple properties of the table.
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName));
|
||||||
|
boolean newCompactionEnableOption = htd.isCompactionEnabled() ? false : true;
|
||||||
|
htd.setCompactionEnabled(newCompactionEnableOption);
|
||||||
|
htd.addFamily(new HColumnDescriptor(cf2));
|
||||||
|
htd.removeFamily(cf3.getBytes());
|
||||||
|
|
||||||
|
// Start the Modify procedure && kill the executor
|
||||||
|
long procId =
|
||||||
|
procExec.submitProcedure(new ModifyTableProcedure(procExec.getEnvironment(), htd));
|
||||||
|
|
||||||
|
// Restart the executor and execute the step twice
|
||||||
|
int numberOfSteps = ModifyTableState.values().length;
|
||||||
|
MasterProcedureTestingUtility.testRecoveryAndDoubleExecution(procExec, procId, numberOfSteps,
|
||||||
|
ModifyTableState.values());
|
||||||
|
|
||||||
|
// Validate descriptor
|
||||||
|
HTableDescriptor currentHtd = UTIL.getHBaseAdmin().getTableDescriptor(tableName);
|
||||||
|
assertEquals(newCompactionEnableOption, currentHtd.isCompactionEnabled());
|
||||||
|
assertEquals(2, currentHtd.getFamiliesKeys().size());
|
||||||
|
assertTrue(currentHtd.hasFamily(cf2.getBytes()));
|
||||||
|
assertFalse(currentHtd.hasFamily(cf3.getBytes()));
|
||||||
|
|
||||||
|
// cf2 should be added cf3 should be removed
|
||||||
|
MasterProcedureTestingUtility.validateTableCreation(UTIL.getHBaseCluster().getMaster(),
|
||||||
|
tableName, regions, "cf1", cf2);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testRollbackAndDoubleExecutionOnline() throws Exception {
|
||||||
|
final TableName tableName = TableName.valueOf("testRollbackAndDoubleExecution");
|
||||||
|
final String familyName = "cf2";
|
||||||
|
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||||
|
|
||||||
|
// create the table
|
||||||
|
HRegionInfo[] regions = MasterProcedureTestingUtility.createTable(
|
||||||
|
procExec, tableName, null, "cf1");
|
||||||
|
|
||||||
|
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
|
||||||
|
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName));
|
||||||
|
boolean newCompactionEnableOption = htd.isCompactionEnabled() ? false : true;
|
||||||
|
htd.setCompactionEnabled(newCompactionEnableOption);
|
||||||
|
htd.addFamily(new HColumnDescriptor(familyName));
|
||||||
|
|
||||||
|
// Start the Modify procedure && kill the executor
|
||||||
|
long procId =
|
||||||
|
procExec.submitProcedure(new ModifyTableProcedure(procExec.getEnvironment(), htd));
|
||||||
|
|
||||||
|
// Restart the executor and rollback the step twice
|
||||||
|
int numberOfSteps = ModifyTableState.values().length - 4; // failing in the middle of proc
|
||||||
|
MasterProcedureTestingUtility.testRollbackAndDoubleExecution(
|
||||||
|
procExec,
|
||||||
|
procId,
|
||||||
|
numberOfSteps,
|
||||||
|
ModifyTableState.values());
|
||||||
|
|
||||||
|
// cf2 should not be present
|
||||||
|
MasterProcedureTestingUtility.validateTableCreation(UTIL.getHBaseCluster().getMaster(),
|
||||||
|
tableName, regions, "cf1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testRollbackAndDoubleExecutionOffline() throws Exception {
|
||||||
|
final TableName tableName = TableName.valueOf("testRollbackAndDoubleExecution");
|
||||||
|
final String familyName = "cf2";
|
||||||
|
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||||
|
|
||||||
|
// create the table
|
||||||
|
HRegionInfo[] regions = MasterProcedureTestingUtility.createTable(
|
||||||
|
procExec, tableName, null, "cf1");
|
||||||
|
UTIL.getHBaseAdmin().disableTable(tableName);
|
||||||
|
|
||||||
|
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
|
||||||
|
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
|
||||||
|
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName));
|
||||||
|
boolean newCompactionEnableOption = htd.isCompactionEnabled() ? false : true;
|
||||||
|
htd.setCompactionEnabled(newCompactionEnableOption);
|
||||||
|
htd.addFamily(new HColumnDescriptor(familyName));
|
||||||
|
htd.setRegionReplication(3);
|
||||||
|
|
||||||
|
// Start the Modify procedure && kill the executor
|
||||||
|
long procId =
|
||||||
|
procExec.submitProcedure(new ModifyTableProcedure(procExec.getEnvironment(), htd));
|
||||||
|
|
||||||
|
// Restart the executor and rollback the step twice
|
||||||
|
int numberOfSteps = ModifyTableState.values().length - 4; // failing in the middle of proc
|
||||||
|
MasterProcedureTestingUtility.testRollbackAndDoubleExecution(
|
||||||
|
procExec,
|
||||||
|
procId,
|
||||||
|
numberOfSteps,
|
||||||
|
ModifyTableState.values());
|
||||||
|
|
||||||
|
// cf2 should not be present
|
||||||
|
MasterProcedureTestingUtility.validateTableCreation(UTIL.getHBaseCluster().getMaster(),
|
||||||
|
tableName, regions, "cf1");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testRollbackAndDoubleExecutionAfterPONR() throws Exception {
|
||||||
|
final TableName tableName = TableName.valueOf("testRollbackAndDoubleExecutionAfterPONR");
|
||||||
|
final String familyToAddName = "cf2";
|
||||||
|
final String familyToRemove = "cf1";
|
||||||
|
final ProcedureExecutor<MasterProcedureEnv> procExec = getMasterProcedureExecutor();
|
||||||
|
|
||||||
|
// create the table
|
||||||
|
HRegionInfo[] regions = MasterProcedureTestingUtility.createTable(
|
||||||
|
procExec, tableName, null, familyToRemove);
|
||||||
|
UTIL.getHBaseAdmin().disableTable(tableName);
|
||||||
|
|
||||||
|
ProcedureTestingUtility.waitNoProcedureRunning(procExec);
|
||||||
|
ProcedureTestingUtility.setKillAndToggleBeforeStoreUpdate(procExec, true);
|
||||||
|
|
||||||
|
HTableDescriptor htd = new HTableDescriptor(UTIL.getHBaseAdmin().getTableDescriptor(tableName));
|
||||||
|
htd.setCompactionEnabled(!htd.isCompactionEnabled());
|
||||||
|
htd.addFamily(new HColumnDescriptor(familyToAddName));
|
||||||
|
htd.removeFamily(familyToRemove.getBytes());
|
||||||
|
htd.setRegionReplication(3);
|
||||||
|
|
||||||
|
// Start the Modify procedure && kill the executor
|
||||||
|
long procId =
|
||||||
|
procExec.submitProcedure(new ModifyTableProcedure(procExec.getEnvironment(), htd));
|
||||||
|
|
||||||
|
// Failing after MODIFY_TABLE_DELETE_FS_LAYOUT we should not trigger the rollback.
|
||||||
|
// NOTE: the 5 (number of MODIFY_TABLE_DELETE_FS_LAYOUT + 1 step) is hardcoded,
|
||||||
|
// so you have to look at this test at least once when you add a new step.
|
||||||
|
int numberOfSteps = 5;
|
||||||
|
MasterProcedureTestingUtility.testRollbackAndDoubleExecutionAfterPONR(
|
||||||
|
procExec,
|
||||||
|
procId,
|
||||||
|
numberOfSteps,
|
||||||
|
ModifyTableState.values());
|
||||||
|
|
||||||
|
// "cf2" should be added and "cf1" should be removed
|
||||||
|
MasterProcedureTestingUtility.validateTableCreation(UTIL.getHBaseCluster().getMaster(),
|
||||||
|
tableName, regions, false, familyToAddName);
|
||||||
|
}
|
||||||
|
|
||||||
|
private ProcedureExecutor<MasterProcedureEnv> getMasterProcedureExecutor() {
|
||||||
|
return UTIL.getHBaseCluster().getMaster().getMasterProcedureExecutor();
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue