HBASE-4213 Support for fault tolerant, instant schema updates with out master's intervention through ZK

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1204611 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Zhihong Yu 2011-11-21 17:29:32 +00:00
parent 46b1b65cc7
commit 7a8c36be23
28 changed files with 3022 additions and 111 deletions

View File

@ -338,7 +338,7 @@
<configuration> <configuration>
<forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds> <forkedProcessTimeoutInSeconds>900</forkedProcessTimeoutInSeconds>
<argLine>-enableassertions -Xmx1400m</argLine> <argLine>-enableassertions -Xmx1900m</argLine>
<redirectTestOutputToFile>true</redirectTestOutputToFile> <redirectTestOutputToFile>true</redirectTestOutputToFile>
</configuration> </configuration>
</plugin> </plugin>

View File

@ -305,7 +305,9 @@ public class LocalHBaseCluster {
*/ */
public HMaster getActiveMaster() { public HMaster getActiveMaster() {
for (JVMClusterUtil.MasterThread mt : masterThreads) { for (JVMClusterUtil.MasterThread mt : masterThreads) {
if (mt.getMaster().isActiveMaster()) { // Ensure that the current active master is not stopped.
// We don't want to return a stopping master as an active master.
if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) {
return mt.getMaster(); return mt.getMaster();
} }
} }
@ -449,4 +451,4 @@ public class LocalHBaseCluster {
admin.createTable(htd); admin.createTable(htd);
cluster.shutdown(); cluster.shutdown();
} }
} }

View File

@ -141,13 +141,12 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
* Constructor * Constructor
*/ */
EventType(int value) {} EventType(int value) {}
public boolean isOnlineSchemaChangeSupported() { public boolean isSchemaChangeEvent() {
return ( return (
this.equals(EventType.C_M_ADD_FAMILY) || this.equals(EventType.C_M_ADD_FAMILY) ||
this.equals(EventType.C_M_DELETE_FAMILY) || this.equals(EventType.C_M_DELETE_FAMILY) ||
this.equals(EventType.C_M_MODIFY_FAMILY) || this.equals(EventType.C_M_MODIFY_FAMILY) ||
this.equals(EventType.C_M_MODIFY_TABLE) this.equals(EventType.C_M_MODIFY_TABLE));
);
} }
} }

View File

@ -244,13 +244,6 @@ public interface HMasterInterface extends VersionedProtocol {
*/ */
public HTableDescriptor[] getHTableDescriptors(); public HTableDescriptor[] getHTableDescriptors();
/**
* Get current HTD for a given tablename
* @param tableName
* @return HTableDescriptor for the table
*/
//public HTableDescriptor getHTableDescriptor(final byte[] tableName);
/** /**
* Get array of HTDs for requested tables. * Get array of HTDs for requested tables.
* @param tableNames * @param tableNames
@ -258,4 +251,11 @@ public interface HMasterInterface extends VersionedProtocol {
*/ */
public HTableDescriptor[] getHTableDescriptors(List<String> tableNames); public HTableDescriptor[] getHTableDescriptors(List<String> tableNames);
/**
* Returns the current running status of load balancer.
* @return True if LoadBalancer is running now else False.
*/
public boolean isLoadBalancerRunning();
} }

View File

@ -272,10 +272,9 @@ public class AssignmentManager extends ZooKeeperListener {
* @param tableName * @param tableName
* @return Pair indicating the status of the alter command * @return Pair indicating the status of the alter command
* @throws IOException * @throws IOException
* @throws InterruptedException
*/ */
public Pair<Integer, Integer> getReopenStatus(byte[] tableName) public Pair<Integer, Integer> getReopenStatus(byte[] tableName)
throws IOException, InterruptedException { throws IOException {
List <HRegionInfo> hris = List <HRegionInfo> hris =
MetaReader.getTableRegions(this.master.getCatalogTracker(), tableName); MetaReader.getTableRegions(this.master.getCatalogTracker(), tableName);
Integer pending = 0; Integer pending = 0;

View File

@ -27,8 +27,8 @@ import java.net.InetSocketAddress;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.Set; import java.util.Set;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -54,6 +54,7 @@ import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.io.hfile.CacheConfig; import org.apache.hadoop.hbase.io.hfile.CacheConfig;
@ -72,7 +73,6 @@ import org.apache.hadoop.hbase.master.handler.TableAddFamilyHandler;
import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler; import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler; import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics; import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@ -88,8 +88,9 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.ClusterId;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker; import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -158,7 +159,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
private CatalogTracker catalogTracker; private CatalogTracker catalogTracker;
// Cluster status zk tracker and local setter // Cluster status zk tracker and local setter
private ClusterStatusTracker clusterStatusTracker; private ClusterStatusTracker clusterStatusTracker;
// Schema change tracker
private MasterSchemaChangeTracker schemaChangeTracker;
// buffer for "fatal error" notices from region servers // buffer for "fatal error" notices from region servers
// in the cluster. This is only used for assisting // in the cluster. This is only used for assisting
// operations/debugging. // operations/debugging.
@ -184,11 +188,18 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
private CatalogJanitor catalogJanitorChore; private CatalogJanitor catalogJanitorChore;
private LogCleaner logCleaner; private LogCleaner logCleaner;
private Thread schemaJanitorChore;
private MasterCoprocessorHost cpHost; private MasterCoprocessorHost cpHost;
private final ServerName serverName; private final ServerName serverName;
private TableDescriptors tableDescriptors; private TableDescriptors tableDescriptors;
// Whether or not schema alter changes go through ZK or not.
private boolean supportInstantSchemaChanges = false;
private volatile boolean loadBalancerRunning = false;
/** /**
* Initializes the HMaster. The steps are as follows: * Initializes the HMaster. The steps are as follows:
@ -254,6 +265,17 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true); this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
this.rpcServer.startThreads(); this.rpcServer.startThreads();
this.metrics = new MasterMetrics(getServerName().toString()); this.metrics = new MasterMetrics(getServerName().toString());
// initialize instant schema change settings
this.supportInstantSchemaChanges = conf.getBoolean(
"hbase.instant.schema.alter.enabled", false);
if (supportInstantSchemaChanges) {
LOG.info("Instant schema change enabled. All schema alter operations will " +
"happen through ZK.");
}
else {
LOG.info("Instant schema change disabled. All schema alter operations will " +
"happen normally.");
}
} }
/** /**
@ -386,6 +408,12 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
boolean wasUp = this.clusterStatusTracker.isClusterUp(); boolean wasUp = this.clusterStatusTracker.isClusterUp();
if (!wasUp) this.clusterStatusTracker.setClusterUp(); if (!wasUp) this.clusterStatusTracker.setClusterUp();
// initialize schema change tracker
this.schemaChangeTracker = new MasterSchemaChangeTracker(getZooKeeper(),
this, this,
conf.getInt("hbase.instant.schema.alter.timeout", 60000));
this.schemaChangeTracker.start();
LOG.info("Server active/primary master; " + this.serverName + LOG.info("Server active/primary master; " + this.serverName +
", sessionid=0x" + ", sessionid=0x" +
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
@ -497,6 +525,9 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
this.catalogJanitorChore = new CatalogJanitor(this, this); this.catalogJanitorChore = new CatalogJanitor(this, this);
Threads.setDaemonThreadRunning(catalogJanitorChore.getThread()); Threads.setDaemonThreadRunning(catalogJanitorChore.getThread());
// Schema janitor chore.
this.schemaJanitorChore = getAndStartSchemaJanitorChore(this);
status.markComplete("Initialization successful"); status.markComplete("Initialization successful");
LOG.info("Master has completed initialization"); LOG.info("Master has completed initialization");
initialized = true; initialized = true;
@ -622,6 +653,15 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
return this.tableDescriptors; return this.tableDescriptors;
} }
@Override
public MasterSchemaChangeTracker getSchemaChangeTracker() {
return this.schemaChangeTracker;
}
public RegionServerTracker getRegionServerTracker() {
return this.regionServerTracker;
}
/** @return InfoServer object. Maybe null.*/ /** @return InfoServer object. Maybe null.*/
public InfoServer getInfoServer() { public InfoServer getInfoServer() {
return this.infoServer; return this.infoServer;
@ -724,7 +764,28 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
if (this.executorService != null) this.executorService.shutdown(); if (this.executorService != null) this.executorService.shutdown();
} }
private static Thread getAndStartBalancerChore(final HMaster master) { /**
* Start the schema janitor. This Janitor will periodically sweep the failed/expired schema
* changes.
* @param master
* @return
*/
private Thread getAndStartSchemaJanitorChore(final HMaster master) {
String name = master.getServerName() + "-SchemaJanitorChore";
int schemaJanitorPeriod =
master.getConfiguration().getInt("hbase.instant.schema.janitor.period", 120000);
// Start up the schema janitor chore
Chore chore = new Chore(name, schemaJanitorPeriod, master) {
@Override
protected void chore() {
master.getSchemaChangeTracker().handleFailedOrExpiredSchemaChanges();
}
};
return Threads.setDaemonThreadRunning(chore.getThread());
}
private Thread getAndStartBalancerChore(final HMaster master) {
String name = master.getServerName() + "-BalancerChore"; String name = master.getServerName() + "-BalancerChore";
int balancerPeriod = int balancerPeriod =
master.getConfiguration().getInt("hbase.balancer.period", 300000); master.getConfiguration().getInt("hbase.balancer.period", 300000);
@ -745,6 +806,10 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
if (this.catalogJanitorChore != null) { if (this.catalogJanitorChore != null) {
this.catalogJanitorChore.interrupt(); this.catalogJanitorChore.interrupt();
} }
if (this.schemaJanitorChore != null) {
this.schemaJanitorChore.interrupt();
}
} }
@Override @Override
@ -815,6 +880,15 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
return balancerCutoffTime; return balancerCutoffTime;
} }
/**
* Check whether the Load Balancer is currently running.
* @return true if the Load balancer is currently running.
*/
public boolean isLoadBalancerRunning() {
return loadBalancerRunning;
}
@Override @Override
public boolean balance() { public boolean balance() {
// If balance not true, don't run balancer. // If balance not true, don't run balancer.
@ -822,23 +896,33 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
// Do this call outside of synchronized block. // Do this call outside of synchronized block.
int maximumBalanceTime = getBalancerCutoffTime(); int maximumBalanceTime = getBalancerCutoffTime();
long cutoffTime = System.currentTimeMillis() + maximumBalanceTime; long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
boolean balancerRan; boolean balancerRan = false;
synchronized (this.balancer) { synchronized (this.balancer) {
if (loadBalancerRunning) {
LOG.debug("Load balancer is currently running. Skipping the current execution.");
return false;
}
// Only allow one balance run at at time. // Only allow one balance run at at time.
if (this.assignmentManager.isRegionsInTransition()) { if (this.assignmentManager.isRegionsInTransition()) {
LOG.debug("Not running balancer because " + LOG.debug("Not running balancer because " +
this.assignmentManager.getRegionsInTransition().size() + this.assignmentManager.getRegionsInTransition().size() +
" region(s) in transition: " + " region(s) in transition: " +
org.apache.commons.lang.StringUtils. org.apache.commons.lang.StringUtils.
abbreviate(this.assignmentManager.getRegionsInTransition().toString(), 256)); abbreviate(this.assignmentManager.getRegionsInTransition().toString(), 256));
return false; return false;
} }
if (this.serverManager.areDeadServersInProgress()) { if (this.serverManager.areDeadServersInProgress()) {
LOG.debug("Not running balancer because processing dead regionserver(s): " + LOG.debug("Not running balancer because processing dead regionserver(s): " +
this.serverManager.getDeadServers()); this.serverManager.getDeadServers());
return false; return false;
} }
if (schemaChangeTracker.isSchemaChangeInProgress()) {
LOG.debug("Schema change operation is in progress. Waiting for " +
"it to complete before running the load balancer.");
return false;
}
loadBalancerRunning = true;
if (this.cpHost != null) { if (this.cpHost != null) {
try { try {
if (this.cpHost.preBalance()) { if (this.cpHost.preBalance()) {
@ -855,13 +939,13 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
this.assignmentManager.getAssignments(); this.assignmentManager.getAssignments();
// Returned Map from AM does not include mention of servers w/o assignments. // Returned Map from AM does not include mention of servers w/o assignments.
for (Map.Entry<ServerName, HServerLoad> e: for (Map.Entry<ServerName, HServerLoad> e:
this.serverManager.getOnlineServers().entrySet()) { this.serverManager.getOnlineServers().entrySet()) {
if (!assignments.containsKey(e.getKey())) { if (!assignments.containsKey(e.getKey())) {
assignments.put(e.getKey(), new ArrayList<HRegionInfo>()); assignments.put(e.getKey(), new ArrayList<HRegionInfo>());
} }
} }
List<RegionPlan> plans = this.balancer.balanceCluster(assignments); List<RegionPlan> plans = this.balancer.balanceCluster(assignments);
int rpCount = 0; // number of RegionPlans balanced so far int rpCount = 0; // number of RegionPlans balanced so far
long totalRegPlanExecTime = 0; long totalRegPlanExecTime = 0;
balancerRan = plans != null; balancerRan = plans != null;
if (plans != null && !plans.isEmpty()) { if (plans != null && !plans.isEmpty()) {
@ -875,7 +959,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
// if performing next balance exceeds cutoff time, exit the loop // if performing next balance exceeds cutoff time, exit the loop
(System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) { (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) {
LOG.debug("No more balancing till next balance run; maximumBalanceTime=" + LOG.debug("No more balancing till next balance run; maximumBalanceTime=" +
maximumBalanceTime); maximumBalanceTime);
break; break;
} }
} }
@ -888,6 +972,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
LOG.error("Error invoking master coprocessor postBalance()", ioe); LOG.error("Error invoking master coprocessor postBalance()", ioe);
} }
} }
loadBalancerRunning = false;
} }
return balancerRan; return balancerRan;
} }
@ -1036,22 +1121,63 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
if (cpHost != null) { if (cpHost != null) {
cpHost.preDeleteTable(tableName); cpHost.preDeleteTable(tableName);
} }
this.executorService.submit(new DeleteTableHandler(tableName, this, this)); this.executorService.submit(new DeleteTableHandler(tableName, this, this, this,
supportInstantSchemaChanges));
if (cpHost != null) { if (cpHost != null) {
cpHost.postDeleteTable(tableName); cpHost.postDeleteTable(tableName);
} }
} }
/**
* Get the number of regions of the table that have been updated by the alter.
*
* @return Pair indicating the number of regions updated Pair.getFirst is the
* regions that are yet to be updated Pair.getSecond is the total number
* of regions of the table
*/
public Pair<Integer, Integer> getAlterStatus(byte[] tableName) public Pair<Integer, Integer> getAlterStatus(byte[] tableName)
throws IOException { throws IOException {
if (supportInstantSchemaChanges) {
return getAlterStatusFromSchemaChangeTracker(tableName);
}
return this.assignmentManager.getReopenStatus(tableName);
}
/**
* Used by the client to identify if all regions have the schema updates
*
* @param tableName
* @return Pair indicating the status of the alter command
* @throws IOException
*/
private Pair<Integer, Integer> getAlterStatusFromSchemaChangeTracker(byte[] tableName)
throws IOException {
MasterSchemaChangeTracker.MasterAlterStatus alterStatus = null;
try { try {
return this.assignmentManager.getReopenStatus(tableName); alterStatus =
} catch (InterruptedException e) { this.schemaChangeTracker.getMasterAlterStatus(Bytes.toString(tableName));
throw new IOException("Interrupted", e); } catch (KeeperException ke) {
LOG.error("KeeperException while getting schema alter status for table = "
+ Bytes.toString(tableName), ke);
}
if (alterStatus != null) {
LOG.debug("Getting AlterStatus from SchemaChangeTracker for table = "
+ Bytes.toString(tableName) + " Alter Status = "
+ alterStatus.toString());
int numberPending = alterStatus.getNumberOfRegionsToProcess() -
alterStatus.getNumberOfRegionsProcessed();
return new Pair<Integer, Integer>(alterStatus.getNumberOfRegionsProcessed(),
alterStatus.getNumberOfRegionsToProcess());
} else {
LOG.debug("MasterAlterStatus is NULL for table = "
+ Bytes.toString(tableName));
// should we throw IOException here as it makes more sense?
return new Pair<Integer, Integer>(0,0);
} }
} }
public void addColumn(byte [] tableName, HColumnDescriptor column) public void addColumn(byte [] tableName, HColumnDescriptor column)
throws IOException { throws IOException {
if (cpHost != null) { if (cpHost != null) {
@ -1059,7 +1185,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
return; return;
} }
} }
new TableAddFamilyHandler(tableName, column, this, this).process(); new TableAddFamilyHandler(tableName, column, this, this,
this, supportInstantSchemaChanges).process();
if (cpHost != null) { if (cpHost != null) {
cpHost.postAddColumn(tableName, column); cpHost.postAddColumn(tableName, column);
} }
@ -1072,7 +1199,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
return; return;
} }
} }
new TableModifyFamilyHandler(tableName, descriptor, this, this).process(); new TableModifyFamilyHandler(tableName, descriptor, this, this,
this, supportInstantSchemaChanges).process();
if (cpHost != null) { if (cpHost != null) {
cpHost.postModifyColumn(tableName, descriptor); cpHost.postModifyColumn(tableName, descriptor);
} }
@ -1085,7 +1213,8 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
return; return;
} }
} }
new TableDeleteFamilyHandler(tableName, c, this, this).process(); new TableDeleteFamilyHandler(tableName, c, this, this,
this, supportInstantSchemaChanges).process();
if (cpHost != null) { if (cpHost != null) {
cpHost.postDeleteColumn(tableName, c); cpHost.postDeleteColumn(tableName, c);
} }
@ -1096,7 +1225,7 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
cpHost.preEnableTable(tableName); cpHost.preEnableTable(tableName);
} }
this.executorService.submit(new EnableTableHandler(this, tableName, this.executorService.submit(new EnableTableHandler(this, tableName,
catalogTracker, assignmentManager, false)); catalogTracker, assignmentManager, false));
if (cpHost != null) { if (cpHost != null) {
cpHost.postEnableTable(tableName); cpHost.postEnableTable(tableName);
@ -1152,21 +1281,37 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
@Override @Override
public void modifyTable(final byte[] tableName, HTableDescriptor htd) public void modifyTable(final byte[] tableName, HTableDescriptor htd)
throws IOException { throws IOException {
if (cpHost != null) { if (cpHost != null) {
cpHost.preModifyTable(tableName, htd); cpHost.preModifyTable(tableName, htd);
} }
this.executorService.submit(new ModifyTableHandler(tableName, htd, this, this.executorService.submit(new ModifyTableHandler(tableName, htd, this,
this)); this, this, supportInstantSchemaChanges));
if (cpHost != null) { if (cpHost != null) {
cpHost.postModifyTable(tableName, htd); cpHost.postModifyTable(tableName, htd);
} }
} }
private boolean isOnlineSchemaChangeAllowed() {
return conf.getBoolean(
"hbase.online.schema.update.enable", false);
}
@Override @Override
public void checkTableModifiable(final byte [] tableName) public void checkTableModifiable(final byte [] tableName,
EventHandler.EventType eventType)
throws IOException {
preCheckTableModifiable(tableName);
if (!eventType.isSchemaChangeEvent() ||
!isOnlineSchemaChangeAllowed()) {
if (!getAssignmentManager().getZKTable().
isDisabledTable(Bytes.toString(tableName))) {
throw new TableNotDisabledException(tableName);
}
}
}
private void preCheckTableModifiable(final byte[] tableName)
throws IOException { throws IOException {
String tableNameStr = Bytes.toString(tableName); String tableNameStr = Bytes.toString(tableName);
if (isCatalogTable(tableName)) { if (isCatalogTable(tableName)) {
@ -1175,10 +1320,6 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
if (!MetaReader.tableExists(getCatalogTracker(), tableNameStr)) { if (!MetaReader.tableExists(getCatalogTracker(), tableNameStr)) {
throw new TableNotFoundException(tableNameStr); throw new TableNotFoundException(tableNameStr);
} }
if (!getAssignmentManager().getZKTable().
isDisabledTable(Bytes.toString(tableName))) {
throw new TableNotDisabledException(tableName);
}
} }
public void clearFromTransition(HRegionInfo hri) { public void clearFromTransition(HRegionInfo hri) {

View File

@ -504,7 +504,7 @@ public class MasterFileSystem {
*/ */
public HTableDescriptor addColumn(byte[] tableName, HColumnDescriptor hcd) public HTableDescriptor addColumn(byte[] tableName, HColumnDescriptor hcd)
throws IOException { throws IOException {
LOG.info("AddColumn. Table = " + Bytes.toString(tableName) + " HCD = " + LOG.debug("AddColumn. Table = " + Bytes.toString(tableName) + " HCD = " +
hcd.toString()); hcd.toString());
HTableDescriptor htd = this.services.getTableDescriptors().get(tableName); HTableDescriptor htd = this.services.getTableDescriptors().get(tableName);
if (htd == null) { if (htd == null) {

View File

@ -26,7 +26,10 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotFoundException; import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
/** /**
* Services Master supplies * Services Master supplies
@ -53,12 +56,15 @@ public interface MasterServices extends Server {
public ExecutorService getExecutorService(); public ExecutorService getExecutorService();
/** /**
* Check table is modifiable; i.e. exists and is offline. * Check table modifiable. i.e not ROOT or META and offlined for all commands except
* @param tableName Name of table to check. * alter commands
* @throws TableNotDisabledException * @param tableName
* @throws TableNotFoundException * @param eventType
* @throws IOException
*/ */
public void checkTableModifiable(final byte [] tableName) throws IOException; public void checkTableModifiable(final byte [] tableName,
EventHandler.EventType eventType)
throws IOException;
/** /**
* Create a table using the given table definition. * Create a table using the given table definition.
@ -73,4 +79,17 @@ public interface MasterServices extends Server {
* @return Return table descriptors implementation. * @return Return table descriptors implementation.
*/ */
public TableDescriptors getTableDescriptors(); public TableDescriptors getTableDescriptors();
/**
* Get Master Schema change tracker
* @return
*/
public MasterSchemaChangeTracker getSchemaChangeTracker();
/**
* Return the Region server tracker.
* @return RegionServerTracker
*/
public RegionServerTracker getRegionServerTracker();
} }

View File

@ -333,11 +333,21 @@ public class ServerManager {
} }
} }
/**
* Exclude a RS from any pending schema change process.
* @param serverName
*/
private void excludeRegionServerFromSchemaChanges(final ServerName serverName) {
this.services.getSchemaChangeTracker()
.excludeRegionServerForSchemaChanges(serverName.getServerName());
}
/* /*
* Expire the passed server. Add it to list of deadservers and queue a * Expire the passed server. Add it to list of deadservers and queue a
* shutdown processing. * shutdown processing.
*/ */
public synchronized void expireServer(final ServerName serverName) { public synchronized void expireServer(final ServerName serverName) {
excludeRegionServerFromSchemaChanges(serverName);
if (!this.onlineServers.containsKey(serverName)) { if (!this.onlineServers.containsKey(serverName)) {
LOG.warn("Received expiration of " + serverName + LOG.warn("Received expiration of " + serverName +
" but server is not currently online"); " but server is not currently online");

View File

@ -27,6 +27,7 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -37,9 +38,11 @@ public class DeleteTableHandler extends TableEventHandler {
private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class); private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class);
public DeleteTableHandler(byte [] tableName, Server server, public DeleteTableHandler(byte [] tableName, Server server,
final MasterServices masterServices) final MasterServices masterServices, HMasterInterface masterInterface,
boolean instantChange)
throws IOException { throws IOException {
super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices); super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices,
masterInterface, instantChange);
// The next call fails if no such table. // The next call fails if no such table.
getTableDescriptor(); getTableDescriptor();
} }

View File

@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
public class ModifyTableHandler extends TableEventHandler { public class ModifyTableHandler extends TableEventHandler {
@ -32,9 +33,11 @@ public class ModifyTableHandler extends TableEventHandler {
public ModifyTableHandler(final byte [] tableName, public ModifyTableHandler(final byte [] tableName,
final HTableDescriptor htd, final Server server, final HTableDescriptor htd, final Server server,
final MasterServices masterServices) final MasterServices masterServices, final HMasterInterface masterInterface,
boolean instantModify)
throws IOException { throws IOException {
super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices); super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices,
masterInterface, instantModify);
// Check table exists. // Check table exists.
getTableDescriptor(); getTableDescriptor();
// This is the new schema we are going to write out as this modification. // This is the new schema we are going to write out as this modification.
@ -57,4 +60,4 @@ public class ModifyTableHandler extends TableEventHandler {
return getClass().getSimpleName() + "-" + name + "-" + getSeqid() + "-" + return getClass().getSimpleName() + "-" + name + "-" + getSeqid() + "-" +
tableNameStr; tableNameStr;
} }
} }

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
/** /**
@ -37,8 +38,10 @@ public class TableAddFamilyHandler extends TableEventHandler {
private final HColumnDescriptor familyDesc; private final HColumnDescriptor familyDesc;
public TableAddFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc, public TableAddFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc,
Server server, final MasterServices masterServices) throws IOException { Server server, final MasterServices masterServices,
super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices); HMasterInterface masterInterface, boolean instantChange) throws IOException {
super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices,
masterInterface, instantChange);
HTableDescriptor htd = getTableDescriptor(); HTableDescriptor htd = getTableDescriptor();
if (htd.hasFamily(familyDesc.getName())) { if (htd.hasFamily(familyDesc.getName())) {
throw new InvalidFamilyOperationException("Family '" + throw new InvalidFamilyOperationException("Family '" +

View File

@ -25,6 +25,7 @@ import java.util.List;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -36,8 +37,10 @@ public class TableDeleteFamilyHandler extends TableEventHandler {
private final byte [] familyName; private final byte [] familyName;
public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName, public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName,
Server server, final MasterServices masterServices) throws IOException { Server server, final MasterServices masterServices,
super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices); HMasterInterface masterInterface, boolean instantChange) throws IOException {
super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices,
masterInterface, instantChange);
HTableDescriptor htd = getTableDescriptor(); HTableDescriptor htd = getTableDescriptor();
this.familyName = hasColumnFamily(htd, familyName); this.familyName = hasColumnFamily(htd, familyName);
} }

View File

@ -35,13 +35,19 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.BulkReOpen; import org.apache.hadoop.hbase.master.BulkReOpen;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -57,32 +63,22 @@ import com.google.common.collect.Maps;
public abstract class TableEventHandler extends EventHandler { public abstract class TableEventHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(TableEventHandler.class); private static final Log LOG = LogFactory.getLog(TableEventHandler.class);
protected final MasterServices masterServices; protected final MasterServices masterServices;
protected HMasterInterface master = null;
protected final byte [] tableName; protected final byte [] tableName;
protected final String tableNameStr; protected final String tableNameStr;
protected boolean instantAction = false;
public TableEventHandler(EventType eventType, byte [] tableName, Server server, public TableEventHandler(EventType eventType, byte [] tableName, Server server,
MasterServices masterServices) MasterServices masterServices, HMasterInterface masterInterface,
boolean instantSchemaChange)
throws IOException { throws IOException {
super(server, eventType); super(server, eventType);
this.masterServices = masterServices; this.masterServices = masterServices;
this.tableName = tableName; this.tableName = tableName;
try { this.masterServices.checkTableModifiable(tableName, eventType);
this.masterServices.checkTableModifiable(tableName);
} catch (TableNotDisabledException ex) {
if (isOnlineSchemaChangeAllowed()
&& eventType.isOnlineSchemaChangeSupported()) {
LOG.debug("Ignoring table not disabled exception " +
"for supporting online schema changes.");
} else {
throw ex;
}
}
this.tableNameStr = Bytes.toString(this.tableName); this.tableNameStr = Bytes.toString(this.tableName);
} this.instantAction = instantSchemaChange;
this.master = masterInterface;
private boolean isOnlineSchemaChangeAllowed() {
return this.server.getConfiguration().getBoolean(
"hbase.online.schema.update.enable", false);
} }
@Override @Override
@ -94,16 +90,7 @@ public abstract class TableEventHandler extends EventHandler {
MetaReader.getTableRegions(this.server.getCatalogTracker(), MetaReader.getTableRegions(this.server.getCatalogTracker(),
tableName); tableName);
handleTableOperation(hris); handleTableOperation(hris);
if (eventType.isOnlineSchemaChangeSupported() && this.masterServices. handleSchemaChanges(hris);
getAssignmentManager().getZKTable().
isEnabledTable(Bytes.toString(tableName))) {
if (reOpenAllRegions(hris)) {
LOG.info("Completed table operation " + eventType + " on table " +
Bytes.toString(tableName));
} else {
LOG.warn("Error on reopening the regions");
}
}
} catch (IOException e) { } catch (IOException e) {
LOG.error("Error manipulating table " + Bytes.toString(tableName), e); LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
} catch (KeeperException e) { } catch (KeeperException e) {
@ -111,13 +98,47 @@ public abstract class TableEventHandler extends EventHandler {
} }
} }
private void handleSchemaChanges(List<HRegionInfo> regions)
throws IOException {
if (instantAction && regions != null && !regions.isEmpty()) {
handleInstantSchemaChanges(regions);
} else {
handleRegularSchemaChanges(regions);
}
}
/**
* Perform schema changes only if the table is in enabled state.
* @return
*/
private boolean canPerformSchemaChange() {
return (eventType.isSchemaChangeEvent() && this.masterServices.
getAssignmentManager().getZKTable().
isEnabledTable(Bytes.toString(tableName)));
}
private void handleRegularSchemaChanges(List<HRegionInfo> regions)
throws IOException {
if (canPerformSchemaChange()) {
this.masterServices.getAssignmentManager().setRegionsToReopen(regions);
if (reOpenAllRegions(regions)) {
LOG.info("Completed table operation " + eventType + " on table " +
Bytes.toString(tableName));
} else {
LOG.warn("Error on reopening the regions");
}
}
}
public boolean reOpenAllRegions(List<HRegionInfo> regions) throws IOException { public boolean reOpenAllRegions(List<HRegionInfo> regions) throws IOException {
boolean done = false; boolean done = false;
LOG.info("Bucketing regions by region server..."); LOG.info("Bucketing regions by region server...");
HTable table = new HTable(masterServices.getConfiguration(), tableName); HTable table = new HTable(masterServices.getConfiguration(), tableName);
TreeMap<ServerName, List<HRegionInfo>> serverToRegions = Maps TreeMap<ServerName, List<HRegionInfo>> serverToRegions = Maps
.newTreeMap(); .newTreeMap();
NavigableMap<HRegionInfo, ServerName> hriHserverMapping = table.getRegionLocations(); NavigableMap<HRegionInfo, ServerName> hriHserverMapping
= table.getRegionLocations();
List<HRegionInfo> reRegions = new ArrayList<HRegionInfo>(); List<HRegionInfo> reRegions = new ArrayList<HRegionInfo>();
for (HRegionInfo hri : regions) { for (HRegionInfo hri : regions) {
ServerName rsLocation = hriHserverMapping.get(hri); ServerName rsLocation = hriHserverMapping.get(hri);
@ -159,6 +180,91 @@ public abstract class TableEventHandler extends EventHandler {
return done; return done;
} }
/**
* Check whether any of the regions from the list of regions is undergoing a split.
* We simply check whether there is a unassigned node for any of the region and if so
* we return as true.
* @param regionInfos
* @return
*/
private boolean isSplitInProgress(List<HRegionInfo> regionInfos) {
for (HRegionInfo hri : regionInfos) {
ZooKeeperWatcher zkw = this.masterServices.getZooKeeper();
String node = ZKAssign.getNodeName(zkw, hri.getEncodedName());
try {
if (ZKUtil.checkExists(zkw, node) != -1) {
LOG.debug("Region " + hri.getRegionNameAsString() + " is unassigned. Assuming" +
" that it is undergoing a split");
return true;
}
} catch (KeeperException ke) {
LOG.debug("KeeperException while determining splits in progress.", ke);
// Assume no splits happening?
return false;
}
}
return false;
}
/**
* Wait for region split transaction in progress (if any)
* @param regions
* @param status
*/
private void waitForInflightSplit(List<HRegionInfo> regions, MonitoredTask status) {
while (isSplitInProgress(regions)) {
try {
status.setStatus("Alter Schema is waiting for split region to complete.");
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
protected void handleInstantSchemaChanges(List<HRegionInfo> regions) {
if (regions == null || regions.isEmpty()) {
LOG.debug("Region size is null or empty. Ignoring alter request.");
return;
}
MonitoredTask status = TaskMonitor.get().createStatus(
"Handling alter table request for table = " + tableNameStr);
if (canPerformSchemaChange()) {
boolean prevBalanceSwitch = false;
try {
// turn off load balancer synchronously
prevBalanceSwitch = master.synchronousBalanceSwitch(false);
waitForInflightSplit(regions, status);
MasterSchemaChangeTracker masterSchemaChangeTracker =
this.masterServices.getSchemaChangeTracker();
masterSchemaChangeTracker
.createSchemaChangeNode(Bytes.toString(tableName),
regions.size());
while(!masterSchemaChangeTracker.doesSchemaChangeNodeExists(
Bytes.toString(tableName))) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
status.markComplete("Created ZK node for handling the alter table request for table = "
+ tableNameStr);
} catch (KeeperException e) {
LOG.warn("Instant schema change failed for table " + tableNameStr, e);
status.setStatus("Instant schema change failed for table " + tableNameStr
+ " Cause = " + e.getCause());
} catch (IOException ioe) {
LOG.warn("Instant schema change failed for table " + tableNameStr, ioe);
status.setStatus("Instant schema change failed for table " + tableNameStr
+ " Cause = " + ioe.getCause());
} finally {
master.synchronousBalanceSwitch(prevBalanceSwitch);
}
}
}
/** /**
* @return Table descriptor for this table * @return Table descriptor for this table
* @throws TableExistsException * @throws TableExistsException

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -38,8 +39,10 @@ public class TableModifyFamilyHandler extends TableEventHandler {
public TableModifyFamilyHandler(byte[] tableName, public TableModifyFamilyHandler(byte[] tableName,
HColumnDescriptor familyDesc, Server server, HColumnDescriptor familyDesc, Server server,
final MasterServices masterServices) throws IOException { final MasterServices masterServices,
super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices); HMasterInterface masterInterface, boolean instantChange) throws IOException {
super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices,
masterInterface, instantChange);
HTableDescriptor htd = getTableDescriptor(); HTableDescriptor htd = getTableDescriptor();
hasColumnFamily(htd, familyDesc.getName()); hasColumnFamily(htd, familyDesc.getName());
this.familyDesc = familyDesc; this.familyDesc = familyDesc;

View File

@ -155,12 +155,29 @@ public class CompactSplitThread implements CompactionRequestor {
return false; return false;
} }
/**
* Wait for mid-flight schema alter requests. (if any). We don't want to execute a split
* when a schema alter is in progress as we end up in an inconsistent state.
* @param tableName
*/
private void waitForInflightSchemaChange(String tableName) {
while (this.server.getSchemaChangeTracker()
.isSchemaChangeInProgress(tableName)) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public synchronized void requestSplit(final HRegion r, byte[] midKey) { public synchronized void requestSplit(final HRegion r, byte[] midKey) {
if (midKey == null) { if (midKey == null) {
LOG.debug("Region " + r.getRegionNameAsString() + LOG.debug("Region " + r.getRegionNameAsString() +
" not splittable because midkey=null"); " not splittable because midkey=null");
return; return;
} }
waitForInflightSchemaChange(r.getRegionInfo().getTableNameAsString());
try { try {
this.splits.execute(new SplitRequest(r, midKey, this.server)); this.splits.execute(new SplitRequest(r, midKey, this.server));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {

View File

@ -145,6 +145,7 @@ import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.hadoop.hbase.zookeeper.SchemaChangeTracker;
import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RemoteException;
@ -283,6 +284,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// Cluster Status Tracker // Cluster Status Tracker
private ClusterStatusTracker clusterStatusTracker; private ClusterStatusTracker clusterStatusTracker;
// Schema change Tracker
private SchemaChangeTracker schemaChangeTracker;
// Log Splitting Worker // Log Splitting Worker
private SplitLogWorker splitLogWorker; private SplitLogWorker splitLogWorker;
@ -575,6 +579,11 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf,
this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE));
catalogTracker.start(); catalogTracker.start();
// Schema change tracker
this.schemaChangeTracker = new SchemaChangeTracker(this.zooKeeper,
this, this);
this.schemaChangeTracker.start();
} }
/** /**
@ -1670,7 +1679,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
protected void join() { protected void join() {
Threads.shutdown(this.compactionChecker.getThread()); Threads.shutdown(this.compactionChecker.getThread());
Threads.shutdown(this.cacheFlusher.getThread()); Threads.shutdown(this.cacheFlusher.getThread());
Threads.shutdown(this.hlogRoller.getThread()); if (this.hlogRoller != null) {
Threads.shutdown(this.hlogRoller.getThread());
}
if (this.compactSplitThread != null) { if (this.compactSplitThread != null) {
this.compactSplitThread.join(); this.compactSplitThread.join();
} }
@ -2658,9 +2669,26 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
splitRegion(regionInfo, null); splitRegion(regionInfo, null);
} }
/**
* Wait for mid-flight schema change requests. (if any)
* @param tableName
*/
private void waitForSchemaChange(String tableName) {
while (schemaChangeTracker.isSchemaChangeInProgress(tableName)) {
try {
LOG.debug("Schema alter is inprogress for table = " + tableName
+ " Waiting for alter to complete before a split");
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@Override @Override
public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint) public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
throws NotServingRegionException, IOException { throws NotServingRegionException, IOException {
waitForSchemaChange(Bytes.toString(regionInfo.getTableName()));
checkOpen(); checkOpen();
HRegion region = getRegion(regionInfo.getRegionName()); HRegion region = getRegion(regionInfo.getRegionName());
region.flushcache(); region.flushcache();
@ -3373,6 +3401,60 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
return wal.rollWriter(true); return wal.rollWriter(true);
} }
/**
* Refresh schema changes for given region.
* @param hRegion HRegion to refresh
* @throws IOException
*/
public void refreshRegion(HRegion hRegion) throws IOException {
if (hRegion != null) {
synchronized (this.onlineRegions) {
HRegionInfo regionInfo = hRegion.getRegionInfo();
// Close the region
hRegion.close();
// Remove from online regions
removeFromOnlineRegions(regionInfo.getEncodedName());
// Get new HTD
HTableDescriptor htd = this.tableDescriptors.get(regionInfo.getTableName());
LOG.debug("HTD for region = " + regionInfo.getRegionNameAsString()
+ " Is = " + htd );
HRegion region =
HRegion.openHRegion(hRegion.getRegionInfo(), htd, hlog, conf,
this, null);
// Add new region to the onlineRegions
addToOnlineRegions(region);
}
}
}
/**
* Gets the online regions of the specified table.
* This method looks at the in-memory onlineRegions. It does not go to <code>.META.</code>.
* Only returns <em>online</em> regions. If a region on this table has been
* closed during a disable, etc., it will not be included in the returned list.
* So, the returned list may not necessarily be ALL regions in this table, its
* all the ONLINE regions in the table.
* @param tableName
* @return Online regions from <code>tableName</code>
*/
public List<HRegion> getOnlineRegions(byte[] tableName) {
List<HRegion> tableRegions = new ArrayList<HRegion>();
synchronized (this.onlineRegions) {
for (HRegion region: this.onlineRegions.values()) {
HRegionInfo regionInfo = region.getRegionInfo();
if(Bytes.equals(regionInfo.getTableName(), tableName)) {
tableRegions.add(region);
}
}
}
return tableRegions;
}
public SchemaChangeTracker getSchemaChangeTracker() {
return this.schemaChangeTracker;
}
// used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070). // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
public String[] getCoprocessors() { public String[] getCoprocessors() {
HServerLoad hsl = buildServerLoad(); HServerLoad hsl = buildServerLoad();

View File

@ -21,6 +21,9 @@ package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import java.io.IOException;
import java.util.List;
/** /**
* Interface to Map of online regions. In the Map, the key is the region's * Interface to Map of online regions. In the Map, the key is the region's
* encoded name and the value is an {@link HRegion} instance. * encoded name and the value is an {@link HRegion} instance.
@ -49,4 +52,18 @@ interface OnlineRegions extends Server {
* null if named region is not member of the online regions. * null if named region is not member of the online regions.
*/ */
public HRegion getFromOnlineRegions(String encodedRegionName); public HRegion getFromOnlineRegions(String encodedRegionName);
} /**
* Get all online regions of a table in this RS.
* @param tableName
* @return List of HRegion
* @throws java.io.IOException
*/
public List<HRegion> getOnlineRegions(byte[] tableName) throws IOException;
/**
* Refresh a given region updating it with latest HTD info.
* @param hRegion
*/
public void refreshRegion(HRegion hRegion) throws IOException;
}

View File

@ -0,0 +1,826 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Writable;
import org.apache.zookeeper.KeeperException;
public class MasterSchemaChangeTracker extends ZooKeeperNodeTracker {
public static final Log LOG = LogFactory.getLog(MasterSchemaChangeTracker.class);
private final MasterServices masterServices;
// Used by tests only. Do not change this.
private volatile int sleepTimeMillis = 0;
// schema changes pending more than this time will be timed out.
private long schemaChangeTimeoutMillis = 30000;
/**
* Constructs a new ZK node tracker.
* <p/>
* <p>After construction, use {@link #start} to kick off tracking.
*
* @param watcher
* @param abortable
*/
public MasterSchemaChangeTracker(ZooKeeperWatcher watcher,
Abortable abortable, MasterServices masterServices,
long schemaChangeTimeoutMillis) {
super(watcher, watcher.schemaZNode, abortable);
this.masterServices = masterServices;
this.schemaChangeTimeoutMillis = schemaChangeTimeoutMillis;
}
@Override
public void start() {
try {
watcher.registerListener(this);
List<String> tables =
ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode);
processCompletedSchemaChanges(tables);
} catch (KeeperException e) {
LOG.error("MasterSchemaChangeTracker startup failed.", e);
abortable.abort("MasterSchemaChangeTracker startup failed", e);
}
}
private List<String> getCurrentTables() throws KeeperException {
return
ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode);
}
/**
* When a primary master crashes and the secondary master takes over
* mid-flight during an alter process, the secondary should cleanup any completed
* schema changes not handled by the previous master.
* @param tables
* @throws KeeperException
*/
private void processCompletedSchemaChanges(List<String> tables)
throws KeeperException {
if (tables == null || tables.isEmpty()) {
String msg = "No current schema change in progress. Skipping cleanup";
LOG.debug(msg);
return;
}
String msg = "Master seeing following tables undergoing schema change " +
"process. Tables = " + tables;
MonitoredTask status = TaskMonitor.get().createStatus(msg);
LOG.debug(msg);
for (String table : tables) {
LOG.debug("Processing table = "+ table);
status.setStatus("Processing table = "+ table);
try {
processTableNode(table);
} catch (IOException e) {
String errmsg = "IOException while processing completed schema changes."
+ " Cause = " + e.getCause();
LOG.error(errmsg, e);
status.setStatus(errmsg);
}
}
}
/**
* Get current alter status for a table.
* @param tableName
* @return MasterAlterStatus
* @throws KeeperException
* @throws IOException
*/
public MasterAlterStatus getMasterAlterStatus(String tableName)
throws KeeperException, IOException {
String path = getSchemaChangeNodePathForTable(tableName);
byte[] state = ZKUtil.getData(watcher, path);
if (state == null || state.length <= 0) {
return null;
}
MasterAlterStatus mas = new MasterAlterStatus();
Writables.getWritable(state, mas);
return mas;
}
/**
* Get RS specific alter status for a table & server
* @param tableName
* @param serverName
* @return Region Server's Schema alter status
* @throws KeeperException
* @throws IOException
*/
private SchemaChangeTracker.SchemaAlterStatus getRSSchemaAlterStatus(
String tableName, String serverName)
throws KeeperException, IOException {
String childPath =
getSchemaChangeNodePathForTableAndServer(tableName, serverName);
byte[] childData = ZKUtil.getData(this.watcher, childPath);
if (childData == null || childData.length <= 0) {
return null;
}
SchemaChangeTracker.SchemaAlterStatus sas =
new SchemaChangeTracker.SchemaAlterStatus();
Writables.getWritable(childData, sas);
LOG.debug("Schema Status data for server = " + serverName + " table = "
+ tableName + " == " + sas);
return sas;
}
/**
* Update the master's alter status based on all region server's response.
* @param servers
* @param tableName
* @throws IOException
*/
private void updateMasterAlterStatus(MasterAlterStatus mas,
List<String> servers, String tableName)
throws IOException, KeeperException {
for (String serverName : servers) {
SchemaChangeTracker.SchemaAlterStatus sas =
getRSSchemaAlterStatus(tableName, serverName);
if (sas != null) {
mas.update(sas);
LOG.debug("processTableNodeWithState:Updated Master Alter Status = "
+ mas + " for server = " + serverName);
} else {
LOG.debug("SchemaAlterStatus is NULL for table = " + tableName);
}
}
}
/**
* If schema alter is handled for this table, then delete all the ZK nodes
* created for this table.
* @param tableName
* @throws KeeperException
*/
private void processTableNode(String tableName) throws KeeperException,
IOException {
LOG.debug("processTableNodeWithState. TableName = " + tableName);
List<String> servers =
ZKUtil.listChildrenAndWatchThem(watcher,
getSchemaChangeNodePathForTable(tableName));
MasterAlterStatus mas = getMasterAlterStatus(tableName);
if (mas == null) {
LOG.debug("MasterAlterStatus is NULL. Table = " + tableName);
return;
}
updateMasterAlterStatus(mas, servers, tableName);
LOG.debug("Current Alter status = " + mas);
String nodePath = getSchemaChangeNodePathForTable(tableName);
ZKUtil.updateExistingNodeData(this.watcher, nodePath,
Writables.getBytes(mas), getZKNodeVersion(nodePath));
processAlterStatus(mas, tableName, servers);
}
/**
* Evaluate the master alter status and determine the current status.
* @param alterStatus
* @param tableName
* @param servers
* @param status
*/
private void processAlterStatus(MasterAlterStatus alterStatus,
String tableName, List<String> servers)
throws KeeperException {
if (alterStatus.getNumberOfRegionsToProcess()
== alterStatus.getNumberOfRegionsProcessed()) {
// schema change completed.
String msg = "All region servers have successfully processed the " +
"schema changes for table = " + tableName
+ " . Deleting the schema change node for table = "
+ tableName + " Region servers processed the schema change" +
" request = " + alterStatus.getProcessedHosts()
+ " Total number of regions = " + alterStatus.getNumberOfRegionsToProcess()
+ " Processed regions = " + alterStatus.getNumberOfRegionsProcessed();
MonitoredTask status = TaskMonitor.get().createStatus(
"Checking alter schema request status for table = " + tableName);
status.markComplete(msg);
LOG.debug(msg);
cleanProcessedTableNode(getSchemaChangeNodePathForTable(tableName));
} else {
if (alterStatus.getErrorCause() != null
&& alterStatus.getErrorCause().trim().length() > 0) {
String msg = "Alter schema change failed "
+ "for table = " + tableName + " Number of online regions = "
+ alterStatus.getNumberOfRegionsToProcess() + " processed regions count = "
+ alterStatus.getNumberOfRegionsProcessed()
+ " Original list = " + alterStatus.hostsToProcess + " Processed servers = "
+ servers
+ " Error Cause = " + alterStatus.getErrorCause();
MonitoredTask status = TaskMonitor.get().createStatus(
"Checking alter schema request status for table = " + tableName);
// we have errors.
LOG.debug(msg);
status.abort(msg);
} else {
String msg = "Not all region servers have processed the schema changes"
+ "for table = " + tableName + " Number of online regions = "
+ alterStatus.getNumberOfRegionsToProcess() + " processed regions count = "
+ alterStatus.getNumberOfRegionsProcessed()
+ " Original list = " + alterStatus.hostsToProcess + " Processed servers = "
+ servers + " Alter STate = "
+ alterStatus.getCurrentAlterStatus();
LOG.debug(msg);
// status.setStatus(msg);
}
}
}
/**
* Check whether a in-flight schema change request has expired.
* @param tableName
* @return true is the schema change request expired.
* @throws IOException
*/
private boolean hasSchemaChangeExpiredFor(String tableName)
throws IOException, KeeperException {
MasterAlterStatus mas = getMasterAlterStatus(tableName);
long createdTimeStamp = mas.getStamp();
long duration = System.currentTimeMillis() - createdTimeStamp;
LOG.debug("Created TimeStamp = " + createdTimeStamp
+ " duration = " + duration + " Table = " + tableName
+ " Master Alter Status = " + mas);
return (duration > schemaChangeTimeoutMillis);
}
/**
* Handle failed and expired schema changes. We simply delete all the
* expired/failed schema change attempts. Why we should do this ?
* 1) Keeping the failed/expired schema change nodes longer prohibits any
* future schema changes for the table.
* 2) Any lingering expired/failed schema change requests will prohibit the
* load balancer from running.
*/
public void handleFailedOrExpiredSchemaChanges() {
try {
List<String> tables = getCurrentTables();
for (String table : tables) {
String statmsg = "Cleaning failed or expired schema change requests. " +
"current tables undergoing " +
"schema change process = " + tables;
MonitoredTask status = TaskMonitor.get().createStatus(statmsg);
LOG.debug(statmsg);
if (hasSchemaChangeExpiredFor(table)) {
// time out.. currently, we abandon the in-flight schema change due to
// time out.
// Here, there are couple of options to consider. One could be to
// attempt a retry of the schema change and see if it succeeds, and
// another could be to simply rollback the schema change effort and
// see if it succeeds.
String msg = "Schema change for table = " + table + " has expired."
+ " Schema change for this table has been in progress for " +
+ schemaChangeTimeoutMillis +
"Deleting the node now.";
LOG.debug(msg);
ZKUtil.deleteNodeRecursively(this.watcher,
getSchemaChangeNodePathForTable(table));
} else {
String msg = "Schema change request is in progress for " +
" table = " + table;
LOG.debug(msg);
status.setStatus(msg);
}
}
} catch (IOException e) {
String msg = "IOException during handleFailedExpiredSchemaChanges."
+ e.getCause();
LOG.error(msg, e);
TaskMonitor.get().createStatus(msg);
} catch (KeeperException ke) {
String msg = "KeeperException during handleFailedExpiredSchemaChanges."
+ ke.getCause();
LOG.error(msg, ke);
TaskMonitor.get().createStatus(msg);
}
}
/**
* Clean the nodes of completed schema change table.
* @param path
* @throws KeeperException
*/
private void cleanProcessedTableNode(String path) throws KeeperException {
if (sleepTimeMillis > 0) {
try {
LOG.debug("Master schema change tracker sleeping for "
+ sleepTimeMillis);
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
ZKUtil.deleteNodeRecursively(this.watcher, path);
LOG.debug("Deleted all nodes for path " + path);
}
/**
* Exclude a RS from schema change request (if applicable)
* We will exclude a RS from schema change request processing if 1) RS
* has online regions for the table AND 2) RS went down mid-flight
* during schema change process. We don't have to deal with RS going
* down mid-flight during a schema change as the online regions from
* the dead RS will get reassigned to some other RS and the
* process of reassign inherently takes care of the schema change as well.
* @param serverName
*/
public void excludeRegionServerForSchemaChanges(String serverName) {
try {
MonitoredTask status = TaskMonitor.get().createStatus(
"Processing schema change exclusion for region server = " + serverName);
List<String> tables =
ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode);
if (tables == null || tables.isEmpty()) {
String msg = "No schema change in progress. Skipping exclusion for " +
"server = "+ serverName;
LOG.debug(msg);
status.setStatus(msg);
return ;
}
for (String tableName : tables) {
excludeRegionServer(tableName, serverName, status);
}
} catch(KeeperException ke) {
LOG.error("KeeperException during excludeRegionServerForSchemaChanges", ke);
} catch(IOException ioe) {
LOG.error("IOException during excludeRegionServerForSchemaChanges", ioe);
}
}
/**
* Check whether a schema change is in progress for a given table on a
* given RS.
* @param tableName
* @param serverName
* @return TRUE is this RS is currently processing a schema change request
* for the table.
* @throws KeeperException
*/
private boolean isSchemaChangeApplicableFor(String tableName,
String serverName)
throws KeeperException {
List<String> servers = ZKUtil.listChildrenAndWatchThem(watcher,
getSchemaChangeNodePathForTable(tableName));
return (servers.contains(serverName));
}
/**
* Exclude a region server for a table (if applicable) from schema change processing.
* @param tableName
* @param serverName
* @param status
* @throws KeeperException
* @throws IOException
*/
private void excludeRegionServer(String tableName, String serverName,
MonitoredTask status)
throws KeeperException, IOException {
if (isSchemaChangeApplicableFor(tableName, serverName)) {
String msg = "Excluding RS " + serverName + " from schema change process" +
" for table = " + tableName;
LOG.debug(msg);
status.setStatus(msg);
SchemaChangeTracker.SchemaAlterStatus sas =
getRSSchemaAlterStatus(tableName, serverName);
if (sas == null) {
LOG.debug("SchemaAlterStatus is NULL for table = " + tableName
+ " server = " + serverName);
return;
}
// Set the status to IGNORED so we can process it accordingly.
sas.setCurrentAlterStatus(
SchemaChangeTracker.SchemaAlterStatus.AlterState.IGNORED);
LOG.debug("Updating the current schema status to " + sas);
String nodePath = getSchemaChangeNodePathForTableAndServer(tableName,
serverName);
ZKUtil.updateExistingNodeData(this.watcher,
nodePath, Writables.getBytes(sas), getZKNodeVersion(nodePath));
} else {
LOG.debug("Skipping exclusion of RS " + serverName
+ " from schema change process"
+ " for table = " + tableName
+ " as it did not possess any online regions for the table");
}
processTableNode(tableName);
}
private int getZKNodeVersion(String nodePath) throws KeeperException {
return ZKUtil.checkExists(this.watcher, nodePath);
}
/**
* Create a new schema change ZK node.
* @param tableName Table name that is getting altered
* @throws KeeperException
*/
public void createSchemaChangeNode(String tableName,
int numberOfRegions)
throws KeeperException, IOException {
MonitoredTask status = TaskMonitor.get().createStatus(
"Creating schema change node for table = " + tableName);
LOG.debug("Creating schema change node for table = "
+ tableName + " Path = "
+ getSchemaChangeNodePathForTable(tableName));
if (doesSchemaChangeNodeExists(tableName)) {
LOG.debug("Schema change node already exists for table = " + tableName
+ " Deleting the schema change node.");
// If we already see a schema change node for this table we wait till the previous
// alter process is complete. Ideally, we need not wait and we could simply delete
// existing schema change node for this table and create new one. But then the
// RS cloud will not be able to process concurrent schema updates for the same table
// as they will be working with same set of online regions for this table. Meaning the
// second alter change will not see any online regions (as they were being closed and
// re opened by the first change) and will miss the second one.
// We either handle this at the RS level using explicit locks while processing a table
// or do it here. I prefer doing it here as it seems much simpler and cleaner.
while(doesSchemaChangeNodeExists(tableName)) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
int rsCount = ZKUtil.getNumberOfChildren(this.watcher, watcher.rsZNode);
// if number of online RS = 0, we should not do anything!
if (rsCount <= 0) {
String msg = "Master is not seeing any online region servers. Aborting the " +
"schema change processing by region servers.";
LOG.debug(msg);
status.abort(msg);
} else {
LOG.debug("Master is seeing " + rsCount + " region servers online before " +
"the schema change process.");
MasterAlterStatus mas = new MasterAlterStatus(numberOfRegions,
getActiveRegionServersAsString());
LOG.debug("Master creating the master alter status = " + mas);
ZKUtil.createSetData(this.watcher,
getSchemaChangeNodePathForTable(tableName), Writables.getBytes(mas));
status.markComplete("Created the ZK node for schema change. Current Alter Status = "
+ mas.toString());
ZKUtil.listChildrenAndWatchThem(this.watcher,
getSchemaChangeNodePathForTable(tableName));
}
}
private String getActiveRegionServersAsString() {
StringBuffer sbuf = new StringBuffer();
List<ServerName> currentRS =
masterServices.getRegionServerTracker().getOnlineServers();
for (ServerName serverName : currentRS) {
sbuf.append(serverName.getServerName());
sbuf.append(" ");
}
LOG.debug("Current list of RS to process the schema change = "
+ sbuf.toString());
return sbuf.toString();
}
/**
* Create a new schema change ZK node.
* @param tableName
* @throws KeeperException
*/
public boolean doesSchemaChangeNodeExists(String tableName)
throws KeeperException {
return ZKUtil.checkExists(watcher,
getSchemaChangeNodePathForTable(tableName)) != -1;
}
/**
* Check whether there are any schema change requests that are in progress now.
* We simply assume that a schema change is in progress if we see a ZK schema node for
* any table. We may revisit for fine grained checks such as check the current alter status
* et al, but it is not required now.
* @return
*/
public boolean isSchemaChangeInProgress() {
try {
int schemaChangeCount = ZKUtil.getNumberOfChildren(this.watcher, watcher.schemaZNode);
return schemaChangeCount > 0;
} catch (KeeperException ke) {
LOG.debug("KeeperException while getting current schema change progress.");
// What do we do now??? currently reporting as false.
}
return false;
}
/**
* We get notified when a RS processes/or completed the schema change request.
* The path will be of the format /hbase/schema/<table name>
* @param path full path of the node whose children have changed
*/
@Override
public void nodeChildrenChanged(String path) {
String tableName = null;
if (path.startsWith(watcher.schemaZNode) &&
!path.equals(watcher.schemaZNode)) {
try {
LOG.debug("NodeChildrenChanged Path = " + path);
tableName = path.substring(path.lastIndexOf("/")+1, path.length());
processTableNode(tableName);
} catch (KeeperException e) {
TaskMonitor.get().createStatus(
"MasterSchemaChangeTracker: ZK exception while processing " +
" nodeChildrenChanged() event for table = " + tableName
+ " Cause = " + e.getCause());
LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting"
+ " schema change nodes", e);
} catch(IOException ioe) {
TaskMonitor.get().createStatus(
"MasterSchemaChangeTracker: ZK exception while processing " +
" nodeChildrenChanged() event for table = " + tableName
+ " Cause = " + ioe.getCause());
LOG.error("MasterSchemaChangeTracker: Unexpected IO exception getting"
+ " schema change nodes", ioe);
}
}
}
/**
* We get notified as and when the RS cloud updates their ZK nodes with
* progress information. The path will be of the format
* /hbase/schema/<table name>/<RS host name>
* @param path
*/
@Override
public void nodeDataChanged(String path) {
String tableName = null;
if (path.startsWith(watcher.schemaZNode) &&
!path.equals(watcher.schemaZNode)) {
try {
LOG.debug("NodeDataChanged Path = " + path);
String[] paths = path.split("/");
tableName = paths[3];
processTableNode(tableName);
} catch (KeeperException e) {
TaskMonitor.get().createStatus(
"MasterSchemaChangeTracker: ZK exception while processing " +
" nodeDataChanged() event for table = " + tableName
+ " Cause = " + e.getCause());
LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting"
+ " schema change nodes", e);
} catch(IOException ioe) {
TaskMonitor.get().createStatus(
"MasterSchemaChangeTracker: IO exception while processing " +
" nodeDataChanged() event for table = " + tableName
+ " Cause = " + ioe.getCause());
LOG.error("MasterSchemaChangeTracker: Unexpected IO exception getting"
+ " schema change nodes", ioe);
}
}
}
public String getSchemaChangeNodePathForTable(String tableName) {
return ZKUtil.joinZNode(watcher.schemaZNode, tableName);
}
/**
* Used only for tests. Do not use this. See TestInstantSchemaChange for more details
* on how this is getting used. This is primarily used to delay the schema complete
* processing by master so that we can test some complex scenarios such as
* master failover.
* @param sleepTimeMillis
*/
public void setSleepTimeMillis(int sleepTimeMillis) {
this.sleepTimeMillis = sleepTimeMillis;
}
private String getSchemaChangeNodePathForTableAndServer(
String tableName, String regionServerName) {
return ZKUtil.joinZNode(getSchemaChangeNodePathForTable(tableName),
regionServerName);
}
/**
* Holds the current alter state for a table. Alter state includes the
* current alter status (INPROCESS, FAILURE or SUCCESS (success is not getting
* used now.), timestamp of alter request, number of hosts online at the time
* of alter request, number of online regions to process for the schema change
* request, number of processed regions and a list of region servers that
* actually processed the schema change request.
*
* Master keeps track of schema change requests using the alter status and
* periodically updates the alter status based on RS cloud processings.
*/
public static class MasterAlterStatus implements Writable {
public enum AlterState {
INPROCESS, // Inprocess alter
SUCCESS, // completed alter
FAILURE // failure alter
}
private AlterState currentAlterStatus;
// TimeStamp
private long stamp;
private int numberOfRegionsToProcess;
private StringBuffer errorCause = new StringBuffer(" ");
private StringBuffer processedHosts = new StringBuffer(" ");
private String hostsToProcess;
private int numberOfRegionsProcessed = 0;
public MasterAlterStatus() {
}
public MasterAlterStatus(int numberOfRegions, String activeHosts) {
this.numberOfRegionsToProcess = numberOfRegions;
this.stamp = System.currentTimeMillis();
this.currentAlterStatus = AlterState.INPROCESS;
//this.rsToProcess = activeHosts;
this.hostsToProcess = activeHosts;
}
public AlterState getCurrentAlterStatus() {
return currentAlterStatus;
}
public void setCurrentAlterStatus(AlterState currentAlterStatus) {
this.currentAlterStatus = currentAlterStatus;
}
public long getStamp() {
return stamp;
}
public void setStamp(long stamp) {
this.stamp = stamp;
}
public int getNumberOfRegionsToProcess() {
return numberOfRegionsToProcess;
}
public void setNumberOfRegionsToProcess(int numberOfRegionsToProcess) {
this.numberOfRegionsToProcess = numberOfRegionsToProcess;
}
public int getNumberOfRegionsProcessed() {
return numberOfRegionsProcessed;
}
public void setNumberOfRegionsProcessed(int numberOfRegionsProcessed) {
this.numberOfRegionsProcessed += numberOfRegionsProcessed;
}
public String getHostsToProcess() {
return hostsToProcess;
}
public void setHostsToProcess(String hostsToProcess) {
this.hostsToProcess = hostsToProcess;
}
public String getErrorCause() {
return errorCause == null ? null : errorCause.toString();
}
public void setErrorCause(String errorCause) {
if (errorCause == null || errorCause.trim().length() <= 0) {
return;
}
if (this.errorCause == null) {
this.errorCause = new StringBuffer(errorCause);
} else {
this.errorCause.append(errorCause);
}
}
public String getProcessedHosts() {
return processedHosts.toString();
}
public void setProcessedHosts(String processedHosts) {
if (this.processedHosts == null) {
this.processedHosts = new StringBuffer(processedHosts);
} else {
this.processedHosts.append(" ").append(processedHosts);
}
}
/**
* Ignore or exempt a RS from schema change processing.
* Master will tweak the number of regions to process based on the
* number of online regions on the target RS and also remove the
* RS from list of hosts to process.
* @param schemaAlterStatus
*/
private void ignoreRSForSchemaChange(
SchemaChangeTracker.SchemaAlterStatus schemaAlterStatus) {
LOG.debug("Removing RS " + schemaAlterStatus.getHostName()
+ " from schema change process.");
hostsToProcess =
new String(hostsToProcess).replaceAll(schemaAlterStatus.getHostName(), "");
int ignoreRegionsCount = schemaAlterStatus.getNumberOfOnlineRegions();
LOG.debug("Current number of regions processed = "
+ this.numberOfRegionsProcessed + " deducting ignored = "
+ ignoreRegionsCount
+ " final = " + (this.numberOfRegionsToProcess-ignoreRegionsCount));
if (this.numberOfRegionsToProcess > 0) {
this.numberOfRegionsToProcess -= ignoreRegionsCount;
} else {
LOG.debug("Number of regions to process is less than zero. This is odd");
}
}
/**
* Update the master alter status for this table based on RS alter status.
* @param schemaAlterStatus
*/
public void update(SchemaChangeTracker.SchemaAlterStatus schemaAlterStatus) {
this.setProcessedHosts(schemaAlterStatus.getHostName());
SchemaChangeTracker.SchemaAlterStatus.AlterState rsState =
schemaAlterStatus.getCurrentAlterStatus();
switch(rsState) {
case FAILURE:
LOG.debug("Schema update failure Status = "
+ schemaAlterStatus);
this.setCurrentAlterStatus(
MasterAlterStatus.AlterState.FAILURE);
this.setNumberOfRegionsProcessed(
schemaAlterStatus.getNumberOfRegionsProcessed());
this.setErrorCause(schemaAlterStatus.getErrorCause());
break;
case SUCCESS:
LOG.debug("Schema update SUCCESS Status = "
+ schemaAlterStatus);
this.setNumberOfRegionsProcessed(
schemaAlterStatus.getNumberOfRegionsProcessed());
this.setCurrentAlterStatus(MasterAlterStatus.AlterState.SUCCESS);
break;
case IGNORED:
LOG.debug("Schema update IGNORED Updating regions to " +
"process count. Status = "+ schemaAlterStatus);
ignoreRSForSchemaChange(schemaAlterStatus);
break;
default:
break;
}
}
@Override
public void readFields(DataInput in) throws IOException {
currentAlterStatus = AlterState.valueOf(in.readUTF());
stamp = in.readLong();
numberOfRegionsToProcess = in.readInt();
hostsToProcess = Bytes.toString(Bytes.readByteArray(in));
processedHosts = new StringBuffer(Bytes.toString(Bytes.readByteArray(in)));
errorCause = new StringBuffer(Bytes.toString(Bytes.readByteArray(in)));
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(currentAlterStatus.name());
out.writeLong(stamp);
out.writeInt(numberOfRegionsToProcess);
Bytes.writeByteArray(out, Bytes.toBytes(hostsToProcess));
Bytes.writeByteArray(out, Bytes.toBytes(processedHosts.toString()));
Bytes.writeByteArray(out, Bytes.toBytes(errorCause.toString()));
}
@Override
public String toString() {
return
" state= " + currentAlterStatus
+ ", ts= " + stamp
+ ", number of regions to process = " + numberOfRegionsToProcess
+ ", number of regions processed = " + numberOfRegionsProcessed
+ ", hosts = " + hostsToProcess
+ " , processed hosts = " + processedHosts
+ " , errorCause = " + errorCause;
}
}
}

View File

@ -0,0 +1,476 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Writable;
import java.io.*;
import java.util.List;
/**
* Region server schema change tracker. RS uses this tracker to keep track of
* alter schema requests from master and updates the status once the schema change
* is complete.
*/
public class SchemaChangeTracker extends ZooKeeperNodeTracker {
public static final Log LOG = LogFactory.getLog(SchemaChangeTracker.class);
private RegionServerServices regionServer = null;
private volatile int sleepTimeMillis = 0;
/**
* Constructs a new ZK node tracker.
* <p/>
* <p>After construction, use {@link #start} to kick off tracking.
*
* @param watcher
* @param node
* @param abortable
*/
public SchemaChangeTracker(ZooKeeperWatcher watcher,
Abortable abortable,
RegionServerServices regionServer) {
super(watcher, watcher.schemaZNode, abortable);
this.regionServer = regionServer;
}
@Override
public void start() {
try {
watcher.registerListener(this);
ZKUtil.listChildrenAndWatchThem(watcher, node);
// Clean-up old in-process schema changes for this RS now?
} catch (KeeperException e) {
LOG.error("RegionServer SchemaChangeTracker startup failed with " +
"KeeperException.", e);
}
}
/**
* This event will be triggered whenever new schema change request is processed by the
* master. The path will be of the format /hbase/schema/<table name>
* @param path full path of the node whose children have changed
*/
@Override
public void nodeChildrenChanged(String path) {
LOG.debug("NodeChildrenChanged. Path = " + path);
if (path.equals(watcher.schemaZNode)) {
try {
List<String> tables =
ZKUtil.listChildrenAndWatchThem(watcher, watcher.schemaZNode);
LOG.debug("RS.SchemaChangeTracker: " +
"Current list of tables with schema change = " + tables);
if (tables != null) {
handleSchemaChange(tables);
} else {
LOG.error("No tables found for schema change event." +
" Skipping instant schema refresh");
}
} catch (KeeperException ke) {
String errmsg = "KeeperException while handling nodeChildrenChanged for path = "
+ path + " Cause = " + ke.getCause();
LOG.error(errmsg, ke);
TaskMonitor.get().createStatus(errmsg);
}
}
}
private void handleSchemaChange(List<String> tables) {
for (String tableName : tables) {
if (tableName != null) {
LOG.debug("Processing schema change with status for table = " + tableName);
handleSchemaChange(tableName);
}
}
}
private void handleSchemaChange(String tableName) {
int refreshedRegionsCount = 0, onlineRegionsCount = 0;
MonitoredTask status = null;
try {
List<HRegion> onlineRegions =
regionServer.getOnlineRegions(Bytes.toBytes(tableName));
if (onlineRegions != null && !onlineRegions.isEmpty()) {
status = TaskMonitor.get().createStatus("Region server "
+ regionServer.getServerName().getServerName()
+ " handling schema change for table = " + tableName
+ " number of online regions = " + onlineRegions.size());
onlineRegionsCount = onlineRegions.size();
createStateNode(tableName, onlineRegions.size());
for (HRegion hRegion : onlineRegions) {
regionServer.refreshRegion(hRegion);
refreshedRegionsCount ++;
}
SchemaAlterStatus alterStatus = getSchemaAlterStatus(tableName);
alterStatus.update(SchemaAlterStatus.AlterState.SUCCESS, refreshedRegionsCount);
updateSchemaChangeStatus(tableName, alterStatus);
String msg = "Refresh schema completed for table name = " + tableName
+ " server = " + regionServer.getServerName().getServerName()
+ " online Regions = " + onlineRegions.size()
+ " refreshed Regions = " + refreshedRegionsCount;
LOG.debug(msg);
status.setStatus(msg);
} else {
LOG.debug("Server " + regionServer.getServerName().getServerName()
+ " has no online regions for table = " + tableName
+ " Ignoring the schema change request");
}
} catch (IOException ioe) {
reportAndLogSchemaRefreshError(tableName, onlineRegionsCount,
refreshedRegionsCount, ioe, status);
} catch (KeeperException ke) {
reportAndLogSchemaRefreshError(tableName, onlineRegionsCount,
refreshedRegionsCount, ke, status);
}
}
private int getZKNodeVersion(String nodePath) throws KeeperException {
return ZKUtil.checkExists(this.watcher, nodePath);
}
private void reportAndLogSchemaRefreshError(String tableName,
int onlineRegionsCount,
int refreshedRegionsCount,
Throwable exception,
MonitoredTask status) {
try {
String errmsg =
" Region Server " + regionServer.getServerName().getServerName()
+ " failed during schema change process. Cause = "
+ exception.getCause()
+ " Number of onlineRegions = " + onlineRegionsCount
+ " Processed regions = " + refreshedRegionsCount;
SchemaAlterStatus alterStatus = getSchemaAlterStatus(tableName);
alterStatus.update(SchemaAlterStatus.AlterState.FAILURE,
refreshedRegionsCount, errmsg);
String nodePath = getSchemaChangeNodePathForTableAndServer(tableName,
regionServer.getServerName().getServerName());
ZKUtil.updateExistingNodeData(this.watcher, nodePath,
Writables.getBytes(alterStatus), getZKNodeVersion(nodePath));
LOG.info("reportAndLogSchemaRefreshError() " +
" Updated child ZKNode with SchemaAlterStatus = "
+ alterStatus + " for table = " + tableName);
if (status == null) {
status = TaskMonitor.get().createStatus(errmsg);
} else {
status.setStatus(errmsg);
}
} catch (KeeperException e) {
// Retry ?
String errmsg = "KeeperException while updating the schema change node with "
+ "error status for table = "
+ tableName + " server = "
+ regionServer.getServerName().getServerName()
+ " Cause = " + e.getCause();
LOG.error(errmsg, e);
TaskMonitor.get().createStatus(errmsg);
} catch(IOException ioe) {
// retry ??
String errmsg = "IOException while updating the schema change node with "
+ "server name for table = "
+ tableName + " server = "
+ regionServer.getServerName().getServerName()
+ " Cause = " + ioe.getCause();
TaskMonitor.get().createStatus(errmsg);
LOG.error(errmsg, ioe);
}
}
private void createStateNode(String tableName, int numberOfOnlineRegions)
throws IOException {
SchemaAlterStatus sas =
new SchemaAlterStatus(regionServer.getServerName().getServerName(),
numberOfOnlineRegions);
LOG.debug("Creating Schema Alter State node = " + sas);
try {
ZKUtil.createSetData(this.watcher,
getSchemaChangeNodePathForTableAndServer(tableName,
regionServer.getServerName().getServerName()),
Writables.getBytes(sas));
} catch (KeeperException ke) {
String errmsg = "KeeperException while creating the schema change node with "
+ "server name for table = "
+ tableName + " server = "
+ regionServer.getServerName().getServerName()
+ " Message = " + ke.getCause();
LOG.error(errmsg, ke);
TaskMonitor.get().createStatus(errmsg);
}
}
private SchemaAlterStatus getSchemaAlterStatus(String tableName)
throws KeeperException, IOException {
byte[] statusBytes = ZKUtil.getData(this.watcher,
getSchemaChangeNodePathForTableAndServer(tableName,
regionServer.getServerName().getServerName()));
if (statusBytes == null || statusBytes.length <= 0) {
return null;
}
SchemaAlterStatus sas = new SchemaAlterStatus();
Writables.getWritable(statusBytes, sas);
return sas;
}
private void updateSchemaChangeStatus(String tableName,
SchemaAlterStatus schemaAlterStatus)
throws KeeperException, IOException {
try {
if(sleepTimeMillis > 0) {
try {
LOG.debug("SchemaChangeTracker sleeping for "
+ sleepTimeMillis);
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
ZKUtil.updateExistingNodeData(this.watcher,
getSchemaChangeNodePathForTableAndServer(tableName,
regionServer.getServerName().getServerName()),
Writables.getBytes(schemaAlterStatus), -1);
String msg = "Schema change tracker completed for table = " + tableName
+ " status = " + schemaAlterStatus;
LOG.debug(msg);
TaskMonitor.get().createStatus(msg);
} catch (KeeperException.NoNodeException e) {
String errmsg = "KeeperException.NoNodeException while updating the schema "
+ "change node with server name for table = "
+ tableName + " server = "
+ regionServer.getServerName().getServerName()
+ " Cause = " + e.getCause();
TaskMonitor.get().createStatus(errmsg);
LOG.error(errmsg, e);
} catch (KeeperException e) {
// Retry ?
String errmsg = "KeeperException while updating the schema change node with "
+ "server name for table = "
+ tableName + " server = "
+ regionServer.getServerName().getServerName()
+ " Cause = " + e.getCause();
LOG.error(errmsg, e);
TaskMonitor.get().createStatus(errmsg);
} catch(IOException ioe) {
String errmsg = "IOException while updating the schema change node with "
+ "server name for table = "
+ tableName + " server = "
+ regionServer.getServerName().getServerName()
+ " Cause = " + ioe.getCause();
LOG.error(errmsg, ioe);
TaskMonitor.get().createStatus(errmsg);
}
}
private String getSchemaChangeNodePathForTable(String tableName) {
return ZKUtil.joinZNode(watcher.schemaZNode, tableName);
}
private String getSchemaChangeNodePathForTableAndServer(
String tableName, String regionServerName) {
return ZKUtil.joinZNode(getSchemaChangeNodePathForTable(tableName),
regionServerName);
}
public int getSleepTimeMillis() {
return sleepTimeMillis;
}
/**
* Set a sleep time in millis before this RS can update it's progress status.
* Used only for test cases to test complex test scenarios such as RS failures and
* RS exemption handling.
* @param sleepTimeMillis
*/
public void setSleepTimeMillis(int sleepTimeMillis) {
this.sleepTimeMillis = sleepTimeMillis;
}
/**
* Check whether there are any schema change requests that are in progress now for the given table.
* We simply assume that a schema change is in progress if we see a ZK schema node this
* any table. We may revisit for fine grained checks such as check the current alter status
* et al, but it is not required now.
* @return
*/
public boolean isSchemaChangeInProgress(String tableName) {
try {
List<String> schemaChanges = ZKUtil.listChildrenAndWatchThem(this.watcher,
watcher.schemaZNode);
if (schemaChanges != null) {
for (String alterTableName : schemaChanges) {
if (alterTableName.equals(tableName)) {
return true;
}
}
return false;
}
} catch (KeeperException ke) {
LOG.debug("isSchemaChangeInProgress. " +
"KeeperException while getting current schema change progress.");
return false;
}
return false;
}
/**
* Holds the current alter state for a table. Alter state includes the
* current alter status (INPROCESS, FAILURE, SUCCESS, or IGNORED, current RS
* host name, timestamp of alter request, number of online regions this RS has for
* the given table, number of processed regions and an errorCause in case
* if the RS failed during the schema change process.
*
* RS keeps track of schema change requests per table using the alter status and
* periodically updates the alter status based on schema change status.
*/
public static class SchemaAlterStatus implements Writable {
public enum AlterState {
INPROCESS, // Inprocess alter
SUCCESS, // completed alter
FAILURE, // failure alter
IGNORED // Ignore the alter processing.
}
private AlterState currentAlterStatus;
// TimeStamp
private long stamp;
private int numberOfOnlineRegions;
private String errorCause = " ";
private String hostName;
private int numberOfRegionsProcessed = 0;
public SchemaAlterStatus() {
}
public SchemaAlterStatus(String hostName, int numberOfOnlineRegions) {
this.numberOfOnlineRegions = numberOfOnlineRegions;
this.stamp = System.currentTimeMillis();
this.currentAlterStatus = AlterState.INPROCESS;
//this.rsToProcess = activeHosts;
this.hostName = hostName;
}
public AlterState getCurrentAlterStatus() {
return currentAlterStatus;
}
public void setCurrentAlterStatus(AlterState currentAlterStatus) {
this.currentAlterStatus = currentAlterStatus;
}
public int getNumberOfOnlineRegions() {
return numberOfOnlineRegions;
}
public void setNumberOfOnlineRegions(int numberOfRegions) {
this.numberOfOnlineRegions = numberOfRegions;
}
public int getNumberOfRegionsProcessed() {
return numberOfRegionsProcessed;
}
public void setNumberOfRegionsProcessed(int numberOfRegionsProcessed) {
this.numberOfRegionsProcessed = numberOfRegionsProcessed;
}
public String getErrorCause() {
return errorCause;
}
public void setErrorCause(String errorCause) {
this.errorCause = errorCause;
}
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
public void update(AlterState state, int numberOfRegions, String errorCause) {
this.currentAlterStatus = state;
this.numberOfRegionsProcessed = numberOfRegions;
this.errorCause = errorCause;
}
public void update(AlterState state, int numberOfRegions) {
this.currentAlterStatus = state;
this.numberOfRegionsProcessed = numberOfRegions;
}
public void update(AlterState state) {
this.currentAlterStatus = state;
}
public void update(SchemaAlterStatus status) {
this.currentAlterStatus = status.getCurrentAlterStatus();
this.numberOfRegionsProcessed = status.getNumberOfRegionsProcessed();
this.errorCause = status.getErrorCause();
}
@Override
public void readFields(DataInput in) throws IOException {
currentAlterStatus = AlterState.valueOf(in.readUTF());
stamp = in.readLong();
numberOfOnlineRegions = in.readInt();
hostName = Bytes.toString(Bytes.readByteArray(in));
numberOfRegionsProcessed = in.readInt();
errorCause = Bytes.toString(Bytes.readByteArray(in));
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(currentAlterStatus.name());
out.writeLong(stamp);
out.writeInt(numberOfOnlineRegions);
Bytes.writeByteArray(out, Bytes.toBytes(hostName));
out.writeInt(numberOfRegionsProcessed);
Bytes.writeByteArray(out, Bytes.toBytes(errorCause));
}
@Override
public String toString() {
return
" state= " + currentAlterStatus
+ ", ts= " + stamp
+ ", number of online regions = " + numberOfOnlineRegions
+ ", host= " + hostName + " processed regions = " + numberOfRegionsProcessed
+ ", errorCause = " + errorCause;
}
}
}

View File

@ -100,6 +100,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
public String clusterIdZNode; public String clusterIdZNode;
// znode used for log splitting work assignment // znode used for log splitting work assignment
public String splitLogZNode; public String splitLogZNode;
// znode used to record table schema changes
public String schemaZNode;
// Certain ZooKeeper nodes need to be world-readable // Certain ZooKeeper nodes need to be world-readable
public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE = public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE =
@ -162,6 +164,7 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
ZKUtil.createAndFailSilent(this, drainingZNode); ZKUtil.createAndFailSilent(this, drainingZNode);
ZKUtil.createAndFailSilent(this, tableZNode); ZKUtil.createAndFailSilent(this, tableZNode);
ZKUtil.createAndFailSilent(this, splitLogZNode); ZKUtil.createAndFailSilent(this, splitLogZNode);
ZKUtil.createAndFailSilent(this, schemaZNode);
} catch (KeeperException e) { } catch (KeeperException e) {
throw new ZooKeeperConnectionException( throw new ZooKeeperConnectionException(
prefix("Unexpected KeeperException creating base node"), e); prefix("Unexpected KeeperException creating base node"), e);
@ -211,6 +214,8 @@ public class ZooKeeperWatcher implements Watcher, Abortable {
conf.get("zookeeper.znode.clusterId", "hbaseid")); conf.get("zookeeper.znode.clusterId", "hbaseid"));
splitLogZNode = ZKUtil.joinZNode(baseZNode, splitLogZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME)); conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
schemaZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.schema", "schema"));
} }
/** /**

View File

@ -771,15 +771,39 @@
</description> </description>
</property> </property>
<property> <property>
<name>hbase.coprocessor.abortonerror</name> <name>hbase.coprocessor.abortonerror</name>
<value>false</value> <value>false</value>
<description> <description>
Set to true to cause the hosting server (master or regionserver) to Set to true to cause the hosting server (master or regionserver) to
abort if a coprocessor throws a Throwable object that is not IOException or abort if a coprocessor throws a Throwable object that is not IOException or
a subclass of IOException. Setting it to true might be useful in development a subclass of IOException. Setting it to true might be useful in development
environments where one wants to terminate the server as soon as possible to environments where one wants to terminate the server as soon as possible to
simplify coprocessor failure analysis. simplify coprocessor failure analysis.
</description> </description>
</property>
<property>
<name>hbase.instant.schema.alter.enabled</name>
<value>false</value>
<description>Whether or not to handle alter schema changes instantly or not.
If enabled, all schema change alter operations will be instant, as the master will not
explicitly unassign/assign the impacted regions and instead will rely on Region servers to
refresh their schema changes. If enabled, the schema alter requests will survive
master or RS failures.
</description>
</property>
<property>
<name>hbase.instant.schema.janitor.period</name>
<value>120000</value>
<description>The Schema Janitor process wakes up every millis and sweeps all
expired/failed schema change requests.
</description>
</property>
<property>
<name>hbase.instant.schema.alter.timeout</name>
<value>60000</value>
<description>Timeout in millis after which any pending schema alter request will be
considered as failed.
</description>
</property> </property>
<property> <property>
<name>hbase.online.schema.update.enable</name> <name>hbase.online.schema.update.enable</name>

View File

@ -0,0 +1,169 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class InstantSchemaChangeTestBase {
final Log LOG = LogFactory.getLog(getClass());
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected HBaseAdmin admin;
protected static MiniHBaseCluster miniHBaseCluster = null;
protected Configuration conf;
protected static MasterSchemaChangeTracker msct = null;
protected final byte [] row = Bytes.toBytes("row");
protected final byte [] qualifier = Bytes.toBytes("qualifier");
final byte [] value = Bytes.toBytes("value");
@Before
public void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
TEST_UTIL.getConfiguration().setBoolean("hbase.instant.schema.alter.enabled", true);
TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.janitor.period", 10000);
TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.alter.timeout", 30000);
//
miniHBaseCluster = TEST_UTIL.startMiniCluster(2,5);
msct = TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
}
@After
public void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* Find the RS that is currently holding our online region.
* @param tableName
* @return
*/
protected HRegionServer findRSWithOnlineRegionFor(String tableName) {
List<JVMClusterUtil.RegionServerThread> rsThreads =
miniHBaseCluster.getLiveRegionServerThreads();
for (JVMClusterUtil.RegionServerThread rsT : rsThreads) {
HRegionServer rs = rsT.getRegionServer();
List<HRegion> regions = rs.getOnlineRegions(Bytes.toBytes(tableName));
if (regions != null && !regions.isEmpty()) {
return rs;
}
}
return null;
}
protected void waitForSchemaChangeProcess(final String tableName)
throws KeeperException, InterruptedException {
waitForSchemaChangeProcess(tableName, 10000);
}
/**
* This a pretty low cost signalling mechanism. It is quite possible that we will
* miss out the ZK node creation signal as in some cases the schema change process
* happens rather quickly and our thread waiting for ZK node creation might wait forver.
* The fool-proof strategy would be to directly listen for ZK events.
* @param tableName
* @throws KeeperException
* @throws InterruptedException
*/
protected void waitForSchemaChangeProcess(final String tableName, final long waitTimeMills)
throws KeeperException, InterruptedException {
LOG.info("Waiting for ZK node creation for table = " + tableName);
final MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
final Runnable r = new Runnable() {
public void run() {
try {
while(!msct.doesSchemaChangeNodeExists(tableName)) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (KeeperException ke) {
ke.printStackTrace();
}
LOG.info("Waiting for ZK node deletion for table = " + tableName);
try {
while(msct.doesSchemaChangeNodeExists(tableName)) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (KeeperException ke) {
ke.printStackTrace();
}
}
};
Thread t = new Thread(r);
t.start();
if (waitTimeMills > 0) {
t.join(waitTimeMills);
} else {
t.join(10000);
}
}
protected HTable createTableAndValidate(String tableName) throws IOException {
conf = TEST_UTIL.getConfiguration();
LOG.info("Start createTableAndValidate()");
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
HTableDescriptor[] tables = admin.listTables();
int numTables = 0;
if (tables != null) {
numTables = tables.length;
}
HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName),
HConstants.CATALOG_FAMILY);
tables = this.admin.listTables();
assertEquals(numTables + 1, tables.length);
LOG.info("created table = " + tableName);
return ht;
}
}

View File

@ -0,0 +1,465 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestInstantSchemaChange extends InstantSchemaChangeTestBase {
final Log LOG = LogFactory.getLog(getClass());
@Test
public void testInstantSchemaChangeForModifyTable() throws IOException,
KeeperException, InterruptedException {
String tableName = "testInstantSchemaChangeForModifyTable";
conf = TEST_UTIL.getConfiguration();
LOG.info("Start testInstantSchemaChangeForModifyTable()");
HTable ht = createTableAndValidate(tableName);
String newFamily = "newFamily";
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(newFamily));
admin.modifyTable(Bytes.toBytes(tableName), htd);
waitForSchemaChangeProcess(tableName);
assertFalse(msct.doesSchemaChangeNodeExists(tableName));
Put put1 = new Put(row);
put1.add(Bytes.toBytes(newFamily), qualifier, value);
ht.put(put1);
Get get1 = new Get(row);
get1.addColumn(Bytes.toBytes(newFamily), qualifier);
Result r = ht.get(get1);
byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier);
int result = Bytes.compareTo(value, tvalue);
assertEquals(result, 0);
LOG.info("END testInstantSchemaChangeForModifyTable()");
}
@Test
public void testInstantSchemaChangeForAddColumn() throws IOException,
KeeperException, InterruptedException {
LOG.info("Start testInstantSchemaChangeForAddColumn() ");
String tableName = "testSchemachangeForAddColumn";
HTable ht = createTableAndValidate(tableName);
String newFamily = "newFamily";
HColumnDescriptor hcd = new HColumnDescriptor("newFamily");
admin.addColumn(Bytes.toBytes(tableName), hcd);
waitForSchemaChangeProcess(tableName);
assertFalse(msct.doesSchemaChangeNodeExists(tableName));
Put put1 = new Put(row);
put1.add(Bytes.toBytes(newFamily), qualifier, value);
LOG.info("******** Put into new column family ");
ht.put(put1);
Get get1 = new Get(row);
get1.addColumn(Bytes.toBytes(newFamily), qualifier);
Result r = ht.get(get1);
byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier);
LOG.info(" Value put = " + value + " value from table = " + tvalue);
int result = Bytes.compareTo(value, tvalue);
assertEquals(result, 0);
LOG.info("End testInstantSchemaChangeForAddColumn() ");
}
@Test
public void testInstantSchemaChangeForModifyColumn() throws IOException,
KeeperException, InterruptedException {
LOG.info("Start testInstantSchemaChangeForModifyColumn() ");
String tableName = "testSchemachangeForModifyColumn";
createTableAndValidate(tableName);
HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY);
hcd.setMaxVersions(99);
hcd.setBlockCacheEnabled(false);
admin.modifyColumn(Bytes.toBytes(tableName), hcd);
waitForSchemaChangeProcess(tableName);
assertFalse(msct.doesSchemaChangeNodeExists(tableName));
List<HRegion> onlineRegions
= miniHBaseCluster.getRegions(Bytes.toBytes("testSchemachangeForModifyColumn"));
for (HRegion onlineRegion : onlineRegions) {
HTableDescriptor htd = onlineRegion.getTableDesc();
HColumnDescriptor tableHcd = htd.getFamily(HConstants.CATALOG_FAMILY);
assertTrue(tableHcd.isBlockCacheEnabled() == false);
assertEquals(tableHcd.getMaxVersions(), 99);
}
LOG.info("End testInstantSchemaChangeForModifyColumn() ");
}
@Test
public void testInstantSchemaChangeForDeleteColumn() throws IOException,
KeeperException, InterruptedException {
LOG.info("Start testInstantSchemaChangeForDeleteColumn() ");
String tableName = "testSchemachangeForDeleteColumn";
int numTables = 0;
HTableDescriptor[] tables = admin.listTables();
if (tables != null) {
numTables = tables.length;
}
byte[][] FAMILIES = new byte[][] {
Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C") };
HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName),
FAMILIES);
tables = this.admin.listTables();
assertEquals(numTables + 1, tables.length);
LOG.info("Table testSchemachangeForDeleteColumn created");
admin.deleteColumn(tableName, "C");
waitForSchemaChangeProcess(tableName);
assertFalse(msct.doesSchemaChangeNodeExists(tableName));
HTableDescriptor modifiedHtd = this.admin.getTableDescriptor(Bytes.toBytes(tableName));
HColumnDescriptor hcd = modifiedHtd.getFamily(Bytes.toBytes("C"));
assertTrue(hcd == null);
LOG.info("End testInstantSchemaChangeForDeleteColumn() ");
}
@Test
public void testInstantSchemaChangeWhenTableIsNotEnabled() throws IOException,
KeeperException {
final String tableName = "testInstantSchemaChangeWhenTableIsDisabled";
conf = TEST_UTIL.getConfiguration();
LOG.info("Start testInstantSchemaChangeWhenTableIsDisabled()");
HTable ht = createTableAndValidate(tableName);
// Disable table
admin.disableTable("testInstantSchemaChangeWhenTableIsDisabled");
// perform schema changes
HColumnDescriptor hcd = new HColumnDescriptor("newFamily");
admin.addColumn(Bytes.toBytes(tableName), hcd);
MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
assertTrue(msct.doesSchemaChangeNodeExists(tableName) == false);
}
/**
* Test that when concurrent alter requests are received for a table we don't miss any.
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testConcurrentInstantSchemaChangeForAddColumn() throws IOException,
KeeperException, InterruptedException {
final String tableName = "testConcurrentInstantSchemaChangeForModifyTable";
conf = TEST_UTIL.getConfiguration();
LOG.info("Start testConcurrentInstantSchemaChangeForModifyTable()");
HTable ht = createTableAndValidate(tableName);
Runnable run1 = new Runnable() {
public void run() {
HColumnDescriptor hcd = new HColumnDescriptor("family1");
try {
admin.addColumn(Bytes.toBytes(tableName), hcd);
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
};
Runnable run2 = new Runnable() {
public void run() {
HColumnDescriptor hcd = new HColumnDescriptor("family2");
try {
admin.addColumn(Bytes.toBytes(tableName), hcd);
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
};
run1.run();
// We have to add a sleep here as in concurrent scenarios the HTD update
// in HDFS fails and returns with null HTD. This needs to be investigated,
// but it doesn't impact the instant alter functionality in any way.
Thread.sleep(100);
run2.run();
waitForSchemaChangeProcess(tableName);
Put put1 = new Put(row);
put1.add(Bytes.toBytes("family1"), qualifier, value);
ht.put(put1);
Get get1 = new Get(row);
get1.addColumn(Bytes.toBytes("family1"), qualifier);
Result r = ht.get(get1);
byte[] tvalue = r.getValue(Bytes.toBytes("family1"), qualifier);
int result = Bytes.compareTo(value, tvalue);
assertEquals(result, 0);
Thread.sleep(10000);
Put put2 = new Put(row);
put2.add(Bytes.toBytes("family2"), qualifier, value);
ht.put(put2);
Get get2 = new Get(row);
get2.addColumn(Bytes.toBytes("family2"), qualifier);
Result r2 = ht.get(get2);
byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
int result2 = Bytes.compareTo(value, tvalue2);
assertEquals(result2, 0);
LOG.info("END testConcurrentInstantSchemaChangeForModifyTable()");
}
/**
* The schema change request blocks while a LB run is in progress. This
* test validates this behavior.
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
*/
@Test
public void testConcurrentInstantSchemaChangeAndLoadBalancerRun() throws IOException,
InterruptedException, KeeperException {
final String tableName = "testInstantSchemaChangeWithLoadBalancerRunning";
conf = TEST_UTIL.getConfiguration();
LOG.info("Start testInstantSchemaChangeWithLoadBalancerRunning()");
final String newFamily = "newFamily";
HTable ht = createTableAndValidate(tableName);
final MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
Runnable balancer = new Runnable() {
public void run() {
// run the balancer now.
miniHBaseCluster.getMaster().balance();
}
};
Runnable schemaChanger = new Runnable() {
public void run() {
HColumnDescriptor hcd = new HColumnDescriptor(newFamily);
try {
admin.addColumn(Bytes.toBytes(tableName), hcd);
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
};
balancer.run();
schemaChanger.run();
waitForSchemaChangeProcess(tableName, 40000);
assertFalse(msct.doesSchemaChangeNodeExists(tableName));
Put put1 = new Put(row);
put1.add(Bytes.toBytes(newFamily), qualifier, value);
LOG.info("******** Put into new column family ");
ht.put(put1);
ht.flushCommits();
LOG.info("******** Get from new column family ");
Get get1 = new Get(row);
get1.addColumn(Bytes.toBytes(newFamily), qualifier);
Result r = ht.get(get1);
byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier);
LOG.info(" Value put = " + value + " value from table = " + tvalue);
int result = Bytes.compareTo(value, tvalue);
assertEquals(result, 0);
LOG.info("End testInstantSchemaChangeWithLoadBalancerRunning() ");
}
/**
* This test validates two things. One is that the LoadBalancer does not run when a schema
* change process is in progress. The second thing is that it also checks that failed/expired
* schema changes are expired to unblock the load balancer run.
*
*/
@Test (timeout=70000)
public void testLoadBalancerBlocksDuringSchemaChangeRequests() throws KeeperException,
IOException, InterruptedException {
LOG.info("Start testConcurrentLoadBalancerSchemaChangeRequests() ");
final MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
// Test that the load balancer does not run while an in-flight schema
// change operation is in progress.
// Simulate a new schema change request.
msct.createSchemaChangeNode("testLoadBalancerBlocks", 0);
// The schema change node is created.
assertTrue(msct.doesSchemaChangeNodeExists("testLoadBalancerBlocks"));
// Now, request an explicit LB run.
Runnable balancer1 = new Runnable() {
public void run() {
// run the balancer now.
miniHBaseCluster.getMaster().balance();
}
};
balancer1.run();
// Load balancer should not run now.
assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false);
LOG.debug("testConcurrentLoadBalancerSchemaChangeRequests Asserted");
LOG.info("End testConcurrentLoadBalancerSchemaChangeRequests() ");
}
/**
* Test that instant schema change blocks while LB is running.
* @throws KeeperException
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=10000)
public void testInstantSchemaChangeBlocksDuringLoadBalancerRun() throws KeeperException,
IOException, InterruptedException {
final MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
final String tableName = "testInstantSchemaChangeBlocksDuringLoadBalancerRun";
conf = TEST_UTIL.getConfiguration();
LOG.info("Start testInstantSchemaChangeBlocksDuringLoadBalancerRun()");
final String newFamily = "newFamily";
createTableAndValidate(tableName);
// Test that the schema change request does not run while an in-flight LB run
// is in progress.
// First, request an explicit LB run.
Runnable balancer1 = new Runnable() {
public void run() {
// run the balancer now.
miniHBaseCluster.getMaster().balance();
}
};
Runnable schemaChanger = new Runnable() {
public void run() {
HColumnDescriptor hcd = new HColumnDescriptor(newFamily);
try {
admin.addColumn(Bytes.toBytes(tableName), hcd);
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
};
Thread t1 = new Thread(balancer1);
Thread t2 = new Thread(schemaChanger);
t1.start();
t2.start();
// check that they both happen concurrently
Runnable balancerCheck = new Runnable() {
public void run() {
// check whether balancer is running.
while(!miniHBaseCluster.getMaster().isLoadBalancerRunning()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
try {
assertFalse(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks"));
} catch (KeeperException ke) {
ke.printStackTrace();
}
LOG.debug("Load Balancer is now running or skipped");
while(miniHBaseCluster.getMaster().isLoadBalancerRunning()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false);
try {
assertTrue(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks"));
} catch (KeeperException ke) {
}
}
};
Thread t = new Thread(balancerCheck);
t.start();
t.join(1000);
// Load balancer should not run now.
//assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false);
// Schema change request node should now exist.
// assertTrue(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks"));
LOG.debug("testInstantSchemaChangeBlocksDuringLoadBalancerRun Asserted");
LOG.info("End testInstantSchemaChangeBlocksDuringLoadBalancerRun() ");
}
/**
* To test the schema janitor (that it cleans expired/failed schema alter attempts) we
* simply create a fake table (that doesn't exist, with fake number of online regions) in ZK.
* This schema alter request will time out (after 30 seconds) and our janitor will clean it up.
* regions
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testInstantSchemaJanitor() throws IOException,
KeeperException, InterruptedException {
LOG.info("testInstantSchemaWithFailedExpiredOperations() ");
String fakeTableName = "testInstantSchemaWithFailedExpiredOperations";
MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
msct.createSchemaChangeNode(fakeTableName, 10);
LOG.debug(msct.getSchemaChangeNodePathForTable(fakeTableName)
+ " created");
Thread.sleep(40000);
assertFalse(msct.doesSchemaChangeNodeExists(fakeTableName));
LOG.debug(msct.getSchemaChangeNodePathForTable(fakeTableName)
+ " deleted");
LOG.info("END testInstantSchemaWithFailedExpiredOperations() ");
}
}

View File

@ -0,0 +1,303 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestInstantSchemaChangeFailover {
final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private HBaseAdmin admin;
private static MiniHBaseCluster miniHBaseCluster = null;
private Configuration conf;
private ZooKeeperWatcher zkw;
private static MasterSchemaChangeTracker msct = null;
private final byte [] row = Bytes.toBytes("row");
private final byte [] qualifier = Bytes.toBytes("qualifier");
final byte [] value = Bytes.toBytes("value");
@Before
public void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
TEST_UTIL.getConfiguration().setBoolean("hbase.instant.schema.alter.enabled", true);
TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.janitor.period", 10000);
TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.alter.timeout", 30000);
//
miniHBaseCluster = TEST_UTIL.startMiniCluster(2,5);
msct = TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
}
@After
public void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* This a pretty low cost signalling mechanism. It is quite possible that we will
* miss out the ZK node creation signal as in some cases the schema change process
* happens rather quickly and our thread waiting for ZK node creation might wait forver.
* The fool-proof strategy would be to directly listen for ZK events.
* @param tableName
* @throws KeeperException
* @throws InterruptedException
*/
private void waitForSchemaChangeProcess(final String tableName)
throws KeeperException, InterruptedException {
LOG.info("Waiting for ZK node creation for table = " + tableName);
final MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
final Runnable r = new Runnable() {
public void run() {
try {
while(!msct.doesSchemaChangeNodeExists(tableName)) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (KeeperException ke) {
ke.printStackTrace();
}
LOG.info("Waiting for ZK node deletion for table = " + tableName);
try {
while(msct.doesSchemaChangeNodeExists(tableName)) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (KeeperException ke) {
ke.printStackTrace();
}
}
};
Thread t = new Thread(r);
t.start();
t.join(10000);
}
/**
* Kill a random RS and see that the schema change can succeed.
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test (timeout=50000)
public void testInstantSchemaChangeWhileRSCrash() throws IOException,
KeeperException, InterruptedException {
LOG.info("Start testInstantSchemaChangeWhileRSCrash()");
zkw = miniHBaseCluster.getMaster().getZooKeeperWatcher();
final String tableName = "TestRSCrashDuringSchemaChange";
HTable ht = createTableAndValidate(tableName);
HColumnDescriptor hcd = new HColumnDescriptor("family2");
admin.addColumn(Bytes.toBytes(tableName), hcd);
miniHBaseCluster.getRegionServer(0).abort("Killing while instant schema change");
// Let the dust settle down
Thread.sleep(10000);
waitForSchemaChangeProcess(tableName);
Put put2 = new Put(row);
put2.add(Bytes.toBytes("family2"), qualifier, value);
ht.put(put2);
Get get2 = new Get(row);
get2.addColumn(Bytes.toBytes("family2"), qualifier);
Result r2 = ht.get(get2);
byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
int result2 = Bytes.compareTo(value, tvalue2);
assertEquals(result2, 0);
String nodePath = msct.getSchemaChangeNodePathForTable("TestRSCrashDuringSchemaChange");
assertTrue(ZKUtil.checkExists(zkw, nodePath) == -1);
LOG.info("result2 = " + result2);
LOG.info("end testInstantSchemaChangeWhileRSCrash()");
}
/**
* Randomly bring down/up RS servers while schema change is in progress. This test
* is same as the above one but the only difference is that we intent to kill and start
* new RS instances while a schema change is in progress.
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test (timeout=70000)
public void testInstantSchemaChangeWhileRandomRSCrashAndStart() throws IOException,
KeeperException, InterruptedException {
LOG.info("Start testInstantSchemaChangeWhileRandomRSCrashAndStart()");
miniHBaseCluster.getRegionServer(4).abort("Killing RS 4");
// Start a new RS before schema change .
// Commenting the start RS as it is failing with DFS user permission NPE.
//miniHBaseCluster.startRegionServer();
// Let the dust settle
Thread.sleep(10000);
final String tableName = "testInstantSchemaChangeWhileRandomRSCrashAndStart";
HTable ht = createTableAndValidate(tableName);
HColumnDescriptor hcd = new HColumnDescriptor("family2");
admin.addColumn(Bytes.toBytes(tableName), hcd);
// Kill 2 RS now.
miniHBaseCluster.getRegionServer(2).abort("Killing RS 2");
// Let the dust settle
Thread.sleep(10000);
// We will be left with only one RS.
waitForSchemaChangeProcess(tableName);
assertFalse(msct.doesSchemaChangeNodeExists(tableName));
Put put2 = new Put(row);
put2.add(Bytes.toBytes("family2"), qualifier, value);
ht.put(put2);
Get get2 = new Get(row);
get2.addColumn(Bytes.toBytes("family2"), qualifier);
Result r2 = ht.get(get2);
byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
int result2 = Bytes.compareTo(value, tvalue2);
assertEquals(result2, 0);
LOG.info("result2 = " + result2);
LOG.info("end testInstantSchemaChangeWhileRandomRSCrashAndStart()");
}
/**
* Test scenario where primary master is brought down while processing an
* alter request. This is harder one as it is very difficult the time this.
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test (timeout=50000)
public void testInstantSchemaChangeWhileMasterFailover() throws IOException,
KeeperException, InterruptedException {
LOG.info("Start testInstantSchemaChangeWhileMasterFailover()");
//Thread.sleep(5000);
final String tableName = "testInstantSchemaChangeWhileMasterFailover";
HTable ht = createTableAndValidate(tableName);
HColumnDescriptor hcd = new HColumnDescriptor("family2");
admin.addColumn(Bytes.toBytes(tableName), hcd);
// Kill primary master now.
Thread.sleep(50);
miniHBaseCluster.getMaster().abort("Aborting master now", new Exception("Schema exception"));
// It may not be possible for us to check the schema change status
// using waitForSchemaChangeProcess as our ZK session in MasterSchemachangeTracker will be
// lost when master dies and hence may not be accurate. So relying on old-fashioned
// sleep here.
Thread.sleep(25000);
Put put2 = new Put(row);
put2.add(Bytes.toBytes("family2"), qualifier, value);
ht.put(put2);
Get get2 = new Get(row);
get2.addColumn(Bytes.toBytes("family2"), qualifier);
Result r2 = ht.get(get2);
byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
int result2 = Bytes.compareTo(value, tvalue2);
assertEquals(result2, 0);
LOG.info("result2 = " + result2);
LOG.info("end testInstantSchemaChangeWhileMasterFailover()");
}
/**
* TEst the master fail over during a schema change request in ZK.
* We create a fake schema change request in ZK and abort the primary master
* mid-flight to simulate a master fail over scenario during a mid-flight
* schema change process. The new master's schema janitor will eventually
* cleanup this fake request after time out.
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testInstantSchemaOperationsInZKForMasterFailover() throws IOException,
KeeperException, InterruptedException {
LOG.info("testInstantSchemaOperationsInZKForMasterFailover() ");
String tableName = "testInstantSchemaOperationsInZKForMasterFailover";
conf = TEST_UTIL.getConfiguration();
MasterSchemaChangeTracker activesct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
activesct.createSchemaChangeNode(tableName, 10);
LOG.debug(activesct.getSchemaChangeNodePathForTable(tableName)
+ " created");
assertTrue(activesct.doesSchemaChangeNodeExists(tableName));
// Kill primary master now.
miniHBaseCluster.getMaster().abort("Aborting master now", new Exception("Schema exception"));
// wait for 50 secs. This is so that our schema janitor from fail-over master will kick-in and
// cleanup this failed/expired schema change request.
Thread.sleep(50000);
MasterSchemaChangeTracker newmsct = miniHBaseCluster.getMaster().getSchemaChangeTracker();
assertFalse(newmsct.doesSchemaChangeNodeExists(tableName));
LOG.debug(newmsct.getSchemaChangeNodePathForTable(tableName)
+ " deleted");
LOG.info("END testInstantSchemaOperationsInZKForMasterFailover() ");
}
private HTable createTableAndValidate(String tableName) throws IOException {
conf = TEST_UTIL.getConfiguration();
LOG.info("Start createTableAndValidate()");
HTableDescriptor[] tables = admin.listTables();
int numTables = 0;
if (tables != null) {
numTables = tables.length;
}
HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName),
HConstants.CATALOG_FAMILY);
tables = this.admin.listTables();
assertEquals(numTables + 1, tables.length);
LOG.info("created table = " + tableName);
return ht;
}
}

View File

@ -0,0 +1,217 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestInstantSchemaChangeSplit extends InstantSchemaChangeTestBase {
final Log LOG = LogFactory.getLog(getClass());
/**
* The objective of the following test is to validate that schema exclusions happen properly.
* When a RS server dies or crashes(?) mid-flight during a schema refresh, we would exclude
* all online regions in that RS, as well as the RS itself from schema change process.
*
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testInstantSchemaChangeExclusions() throws IOException,
KeeperException, InterruptedException {
MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
LOG.info("Start testInstantSchemaChangeExclusions() ");
String tableName = "testInstantSchemaChangeExclusions";
HTable ht = createTableAndValidate(tableName);
HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY);
hcd.setMaxVersions(99);
hcd.setBlockCacheEnabled(false);
HRegionServer hrs = findRSWithOnlineRegionFor(tableName);
//miniHBaseCluster.getRegionServer(0).abort("killed for test");
admin.modifyColumn(Bytes.toBytes(tableName), hcd);
hrs.abort("Aborting for tests");
hrs.getSchemaChangeTracker().setSleepTimeMillis(20000);
//admin.modifyColumn(Bytes.toBytes(tableName), hcd);
LOG.debug("Waiting for Schema Change process to complete");
waitForSchemaChangeProcess(tableName, 15000);
assertEquals(msct.doesSchemaChangeNodeExists(tableName), false);
// Sleep for some time so that our region is reassigned to some other RS
// by master.
Thread.sleep(10000);
List<HRegion> onlineRegions
= miniHBaseCluster.getRegions(Bytes.toBytes("testInstantSchemaChangeExclusions"));
assertTrue(!onlineRegions.isEmpty());
for (HRegion onlineRegion : onlineRegions) {
HTableDescriptor htd = onlineRegion.getTableDesc();
HColumnDescriptor tableHcd = htd.getFamily(HConstants.CATALOG_FAMILY);
assertTrue(tableHcd.isBlockCacheEnabled() == false);
assertEquals(tableHcd.getMaxVersions(), 99);
}
LOG.info("End testInstantSchemaChangeExclusions() ");
}
/**
* This test validates that when a schema change request fails on the
* RS side, we appropriately register the failure in the Master Schema change
* tracker's node as well as capture the error cause.
*
* Currently an alter request fails if RS fails with an IO exception say due to
* missing or incorrect codec. With instant schema change the same failure happens
* and we register the failure with associated cause and also update the
* monitor status appropriately.
*
* The region(s) will be orphaned in both the cases.
*
*/
@Test
public void testInstantSchemaChangeWhileRSOpenRegionFailure() throws IOException,
KeeperException, InterruptedException {
MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
LOG.info("Start testInstantSchemaChangeWhileRSOpenRegionFailure() ");
String tableName = "testInstantSchemaChangeWhileRSOpenRegionFailure";
HTable ht = createTableAndValidate(tableName);
// create now 100 regions
TEST_UTIL.createMultiRegions(conf, ht,
HConstants.CATALOG_FAMILY, 10);
// wait for all the regions to be assigned
Thread.sleep(10000);
List<HRegion> onlineRegions
= miniHBaseCluster.getRegions(
Bytes.toBytes("testInstantSchemaChangeWhileRSOpenRegionFailure"));
int size = onlineRegions.size();
// we will not have any online regions
LOG.info("Size of online regions = " + onlineRegions.size());
HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY);
hcd.setMaxVersions(99);
hcd.setBlockCacheEnabled(false);
hcd.setCompressionType(Compression.Algorithm.SNAPPY);
admin.modifyColumn(Bytes.toBytes(tableName), hcd);
Thread.sleep(100);
assertEquals(msct.doesSchemaChangeNodeExists(tableName), true);
Thread.sleep(10000);
// get the current alter status and validate that its failure with appropriate error msg.
MasterSchemaChangeTracker.MasterAlterStatus mas = msct.getMasterAlterStatus(tableName);
assertTrue(mas != null);
assertEquals(mas.getCurrentAlterStatus(),
MasterSchemaChangeTracker.MasterAlterStatus.AlterState.FAILURE);
assertTrue(mas.getErrorCause() != null);
LOG.info("End testInstantSchemaChangeWhileRSOpenRegionFailure() ");
}
@Test
public void testConcurrentInstantSchemaChangeAndSplit() throws IOException,
InterruptedException, KeeperException {
final String tableName = "testConcurrentInstantSchemaChangeAndSplit";
conf = TEST_UTIL.getConfiguration();
LOG.info("Start testConcurrentInstantSchemaChangeAndSplit()");
final String newFamily = "newFamily";
HTable ht = createTableAndValidate(tableName);
final MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
// create now 10 regions
TEST_UTIL.createMultiRegions(conf, ht,
HConstants.CATALOG_FAMILY, 4);
int rowCount = TEST_UTIL.loadTable(ht, HConstants.CATALOG_FAMILY);
//assertRowCount(t, rowCount);
Runnable splitter = new Runnable() {
public void run() {
// run the splits now.
try {
LOG.info("Splitting table now ");
admin.split(Bytes.toBytes(tableName));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Runnable schemaChanger = new Runnable() {
public void run() {
HColumnDescriptor hcd = new HColumnDescriptor(newFamily);
try {
admin.addColumn(Bytes.toBytes(tableName), hcd);
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
};
schemaChanger.run();
Thread.sleep(50);
splitter.run();
waitForSchemaChangeProcess(tableName, 40000);
Put put1 = new Put(row);
put1.add(Bytes.toBytes(newFamily), qualifier, value);
LOG.info("******** Put into new column family ");
ht.put(put1);
ht.flushCommits();
LOG.info("******** Get from new column family ");
Get get1 = new Get(row);
get1.addColumn(Bytes.toBytes(newFamily), qualifier);
Result r = ht.get(get1);
byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier);
LOG.info(" Value put = " + value + " value from table = " + tvalue);
int result = Bytes.compareTo(value, tvalue);
assertEquals(result, 0);
LOG.info("End testConcurrentInstantSchemaChangeAndSplit() ");
}
}

View File

@ -41,12 +41,15 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -138,11 +141,6 @@ public class TestCatalogJanitor {
this.asm = Mockito.mock(AssignmentManager.class); this.asm = Mockito.mock(AssignmentManager.class);
} }
@Override
public void checkTableModifiable(byte[] tableName) throws IOException {
//no-op
}
@Override @Override
public void createTable(HTableDescriptor desc, byte[][] splitKeys) public void createTable(HTableDescriptor desc, byte[][] splitKeys)
throws IOException { throws IOException {
@ -159,6 +157,11 @@ public class TestCatalogJanitor {
return null; return null;
} }
public void checkTableModifiable(byte[] tableName,
EventHandler.EventType eventType)
throws IOException {
}
@Override @Override
public MasterFileSystem getMasterFileSystem() { public MasterFileSystem getMasterFileSystem() {
return this.mfs; return this.mfs;
@ -243,6 +246,14 @@ public class TestCatalogJanitor {
} }
}; };
} }
public MasterSchemaChangeTracker getSchemaChangeTracker() {
return null;
}
public RegionServerTracker getRegionServerTracker() {
return null;
}
} }
@Test @Test

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.util;
import java.io.IOException; import java.io.IOException;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ConcurrentSkipListMap;
@ -55,6 +56,13 @@ public class MockRegionServerServices implements RegionServerServices {
return this.regions.get(encodedRegionName); return this.regions.get(encodedRegionName);
} }
public List<HRegion> getOnlineRegions(byte[] tableName) throws IOException {
return null;
}
public void refreshRegion(HRegion hRegion) throws IOException {
}
@Override @Override
public void addToOnlineRegions(HRegion r) { public void addToOnlineRegions(HRegion r) {
this.regions.put(r.getRegionInfo().getEncodedName(), r); this.regions.put(r.getRegionInfo().getEncodedName(), r);