HBASE-5715 Revert 'Instant schema alter' for now, HBASE-4213

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1310012 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2012-04-05 18:57:09 +00:00
parent 569dc33d03
commit de4f4aa9e4
28 changed files with 130 additions and 3037 deletions

View File

@ -309,12 +309,14 @@ public class LocalHBaseCluster {
*/ */
public HMaster getActiveMaster() { public HMaster getActiveMaster() {
for (JVMClusterUtil.MasterThread mt : masterThreads) { for (JVMClusterUtil.MasterThread mt : masterThreads) {
if (mt.getMaster().isActiveMaster()) {
// Ensure that the current active master is not stopped. // Ensure that the current active master is not stopped.
// We don't want to return a stopping master as an active master. // We don't want to return a stopping master as an active master.
if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) { if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) {
return mt.getMaster(); return mt.getMaster();
} }
} }
}
return null; return null;
} }

View File

@ -144,12 +144,13 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
* Constructor * Constructor
*/ */
EventType(int value) {} EventType(int value) {}
public boolean isSchemaChangeEvent() { public boolean isOnlineSchemaChangeSupported() {
return ( return (
this.equals(EventType.C_M_ADD_FAMILY) || this.equals(EventType.C_M_ADD_FAMILY) ||
this.equals(EventType.C_M_DELETE_FAMILY) || this.equals(EventType.C_M_DELETE_FAMILY) ||
this.equals(EventType.C_M_MODIFY_FAMILY) || this.equals(EventType.C_M_MODIFY_FAMILY) ||
this.equals(EventType.C_M_MODIFY_TABLE)); this.equals(EventType.C_M_MODIFY_TABLE)
);
} }
} }

View File

@ -266,12 +266,4 @@ public interface HMasterInterface extends VersionedProtocol {
* @return array of HTableDescriptor * @return array of HTableDescriptor
*/ */
public HTableDescriptor[] getHTableDescriptors(List<String> tableNames); public HTableDescriptor[] getHTableDescriptors(List<String> tableNames);
/**
* Returns the current running status of load balancer.
* @return True if LoadBalancer is running now else False.
*/
public boolean isLoadBalancerRunning();
} }

View File

@ -304,7 +304,7 @@ public class AssignmentManager extends ZooKeeperListener {
List <HRegionInfo> hris = List <HRegionInfo> hris =
MetaReader.getTableRegions(this.master.getCatalogTracker(), tableName); MetaReader.getTableRegions(this.master.getCatalogTracker(), tableName);
Integer pending = 0; Integer pending = 0;
for(HRegionInfo hri : hris) { for (HRegionInfo hri : hris) {
String name = hri.getEncodedName(); String name = hri.getEncodedName();
if (regionsToReopen.containsKey(name) || regionsInTransition.containsKey(name)) { if (regionsToReopen.containsKey(name) || regionsInTransition.containsKey(name)) {
pending++; pending++;

View File

@ -71,7 +71,6 @@ import org.apache.hadoop.hbase.client.MetaScanner;
import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType;
import org.apache.hadoop.hbase.ipc.HBaseRPC; import org.apache.hadoop.hbase.ipc.HBaseRPC;
@ -91,6 +90,7 @@ import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler;
import org.apache.hadoop.hbase.master.handler.TableEventHandler; import org.apache.hadoop.hbase.master.handler.TableEventHandler;
import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler; import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler;
import org.apache.hadoop.hbase.master.metrics.MasterMetrics; import org.apache.hadoop.hbase.master.metrics.MasterMetrics;
import org.apache.hadoop.hbase.master.RegionPlan;
import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer;
import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.monitoring.TaskMonitor;
@ -107,7 +107,6 @@ import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.ClusterId;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker; import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -185,9 +184,6 @@ Server {
// Cluster status zk tracker and local setter // Cluster status zk tracker and local setter
private ClusterStatusTracker clusterStatusTracker; private ClusterStatusTracker clusterStatusTracker;
// Schema change tracker
private MasterSchemaChangeTracker schemaChangeTracker;
// buffer for "fatal error" notices from region servers // buffer for "fatal error" notices from region servers
// in the cluster. This is only used for assisting // in the cluster. This is only used for assisting
// operations/debugging. // operations/debugging.
@ -215,18 +211,12 @@ Server {
private CatalogJanitor catalogJanitorChore; private CatalogJanitor catalogJanitorChore;
private LogCleaner logCleaner; private LogCleaner logCleaner;
private Thread schemaJanitorChore;
private MasterCoprocessorHost cpHost; private MasterCoprocessorHost cpHost;
private final ServerName serverName; private final ServerName serverName;
private TableDescriptors tableDescriptors; private TableDescriptors tableDescriptors;
// Whether or not schema alter changes go through ZK or not.
private boolean supportInstantSchemaChanges = false;
private volatile boolean loadBalancerRunning = false;
// Time stamps for when a hmaster was started and when it became active // Time stamps for when a hmaster was started and when it became active
private long masterStartTime; private long masterStartTime;
private long masterActiveTime; private long masterActiveTime;
@ -300,18 +290,6 @@ Server {
this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true); this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true);
this.rpcServer.startThreads(); this.rpcServer.startThreads();
this.metrics = new MasterMetrics(getServerName().toString()); this.metrics = new MasterMetrics(getServerName().toString());
this.supportInstantSchemaChanges = getSupportInstantSchemaChanges(conf);
}
/**
* Get whether instant schema change is on or not.
* @param c
* @return True if instant schema enabled.
*/
private boolean getSupportInstantSchemaChanges(final Configuration c) {
boolean b = c.getBoolean("hbase.instant.schema.alter.enabled", false);
LOG.debug("Instant schema change enabled=" + b + ".");
return b;
} }
/** /**
@ -451,12 +429,6 @@ Server {
boolean wasUp = this.clusterStatusTracker.isClusterUp(); boolean wasUp = this.clusterStatusTracker.isClusterUp();
if (!wasUp) this.clusterStatusTracker.setClusterUp(); if (!wasUp) this.clusterStatusTracker.setClusterUp();
// initialize schema change tracker
this.schemaChangeTracker = new MasterSchemaChangeTracker(getZooKeeper(),
this, this,
conf.getInt("hbase.instant.schema.alter.timeout", 60000));
this.schemaChangeTracker.start();
LOG.info("Server active/primary master; " + this.serverName + LOG.info("Server active/primary master; " + this.serverName +
", sessionid=0x" + ", sessionid=0x" +
Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) +
@ -596,9 +568,6 @@ Server {
this.catalogJanitorChore = new CatalogJanitor(this, this); this.catalogJanitorChore = new CatalogJanitor(this, this);
Threads.setDaemonThreadRunning(catalogJanitorChore.getThread()); Threads.setDaemonThreadRunning(catalogJanitorChore.getThread());
// Schema janitor chore.
this.schemaJanitorChore = getAndStartSchemaJanitorChore(this);
registerMBean(); registerMBean();
status.markComplete("Initialization successful"); status.markComplete("Initialization successful");
@ -811,15 +780,6 @@ Server {
return this.tableDescriptors; return this.tableDescriptors;
} }
@Override
public MasterSchemaChangeTracker getSchemaChangeTracker() {
return this.schemaChangeTracker;
}
public RegionServerTracker getRegionServerTracker() {
return this.regionServerTracker;
}
/** @return InfoServer object. Maybe null.*/ /** @return InfoServer object. Maybe null.*/
public InfoServer getInfoServer() { public InfoServer getInfoServer() {
return this.infoServer; return this.infoServer;
@ -931,28 +891,7 @@ Server {
if (this.executorService != null) this.executorService.shutdown(); if (this.executorService != null) this.executorService.shutdown();
} }
/** private static Thread getAndStartBalancerChore(final HMaster master) {
* Start the schema janitor. This Janitor will periodically sweep the failed/expired schema
* changes.
* @param master
* @return
*/
private Thread getAndStartSchemaJanitorChore(final HMaster master) {
String name = master.getServerName() + "-SchemaJanitorChore";
int schemaJanitorPeriod =
master.getConfiguration().getInt("hbase.instant.schema.janitor.period", 120000);
// Start up the schema janitor chore
Chore chore = new Chore(name, schemaJanitorPeriod, master) {
@Override
protected void chore() {
master.getSchemaChangeTracker().handleFailedOrExpiredSchemaChanges();
}
};
return Threads.setDaemonThreadRunning(chore.getThread());
}
private Thread getAndStartBalancerChore(final HMaster master) {
String name = master.getServerName() + "-BalancerChore"; String name = master.getServerName() + "-BalancerChore";
int balancerPeriod = int balancerPeriod =
master.getConfiguration().getInt("hbase.balancer.period", 300000); master.getConfiguration().getInt("hbase.balancer.period", 300000);
@ -973,10 +912,6 @@ Server {
if (this.catalogJanitorChore != null) { if (this.catalogJanitorChore != null) {
this.catalogJanitorChore.interrupt(); this.catalogJanitorChore.interrupt();
} }
if (this.schemaJanitorChore != null) {
this.schemaJanitorChore.interrupt();
}
} }
@Override @Override
@ -1058,15 +993,6 @@ Server {
return balancerCutoffTime; return balancerCutoffTime;
} }
/**
* Check whether the Load Balancer is currently running.
* @return true if the Load balancer is currently running.
*/
public boolean isLoadBalancerRunning() {
return loadBalancerRunning;
}
@Override @Override
public boolean balance() { public boolean balance() {
// If balance not true, don't run balancer. // If balance not true, don't run balancer.
@ -1074,13 +1000,8 @@ Server {
// Do this call outside of synchronized block. // Do this call outside of synchronized block.
int maximumBalanceTime = getBalancerCutoffTime(); int maximumBalanceTime = getBalancerCutoffTime();
long cutoffTime = System.currentTimeMillis() + maximumBalanceTime; long cutoffTime = System.currentTimeMillis() + maximumBalanceTime;
boolean balancerRan = false; boolean balancerRan;
synchronized (this.balancer) { synchronized (this.balancer) {
if (loadBalancerRunning) {
LOG.debug("Load balancer is currently running. Skipping the current execution.");
return false;
}
// Only allow one balance run at at time. // Only allow one balance run at at time.
if (this.assignmentManager.isRegionsInTransition()) { if (this.assignmentManager.isRegionsInTransition()) {
LOG.debug("Not running balancer because " + LOG.debug("Not running balancer because " +
@ -1095,12 +1016,7 @@ Server {
this.serverManager.getDeadServers()); this.serverManager.getDeadServers());
return false; return false;
} }
if (schemaChangeTracker.isSchemaChangeInProgress()) {
LOG.debug("Schema change operation is in progress. Waiting for " +
"it to complete before running the load balancer.");
return false;
}
loadBalancerRunning = true;
if (this.cpHost != null) { if (this.cpHost != null) {
try { try {
if (this.cpHost.preBalance()) { if (this.cpHost.preBalance()) {
@ -1148,7 +1064,6 @@ Server {
LOG.error("Error invoking master coprocessor postBalance()", ioe); LOG.error("Error invoking master coprocessor postBalance()", ioe);
} }
} }
loadBalancerRunning = false;
} }
return balancerRan; return balancerRan;
} }
@ -1298,9 +1213,7 @@ Server {
if (cpHost != null) { if (cpHost != null) {
cpHost.preDeleteTable(tableName); cpHost.preDeleteTable(tableName);
} }
this.executorService.submit(new DeleteTableHandler(tableName, this, this, this, this.executorService.submit(new DeleteTableHandler(tableName, this, this));
supportInstantSchemaChanges));
if (cpHost != null) { if (cpHost != null) {
cpHost.postDeleteTable(tableName); cpHost.postDeleteTable(tableName);
} }
@ -1312,6 +1225,7 @@ Server {
* @return Pair indicating the number of regions updated Pair.getFirst is the * @return Pair indicating the number of regions updated Pair.getFirst is the
* regions that are yet to be updated Pair.getSecond is the total number * regions that are yet to be updated Pair.getSecond is the total number
* of regions of the table * of regions of the table
* @throws IOException
*/ */
public Pair<Integer, Integer> getAlterStatus(byte[] tableName) public Pair<Integer, Integer> getAlterStatus(byte[] tableName)
throws IOException { throws IOException {
@ -1319,44 +1233,9 @@ Server {
// may overlap with other table operations or the table operation may // may overlap with other table operations or the table operation may
// have completed before querying this API. We need to refactor to a // have completed before querying this API. We need to refactor to a
// transaction system in the future to avoid these ambiguities. // transaction system in the future to avoid these ambiguities.
if (supportInstantSchemaChanges) {
return getAlterStatusFromSchemaChangeTracker(tableName);
}
return this.assignmentManager.getReopenStatus(tableName); return this.assignmentManager.getReopenStatus(tableName);
} }
/**
* Used by the client to identify if all regions have the schema updates
*
* @param tableName
* @return Pair indicating the status of the alter command
* @throws IOException
*/
private Pair<Integer, Integer> getAlterStatusFromSchemaChangeTracker(byte[] tableName)
throws IOException {
MasterSchemaChangeTracker.MasterAlterStatus alterStatus = null;
try {
alterStatus =
this.schemaChangeTracker.getMasterAlterStatus(Bytes.toString(tableName));
} catch (KeeperException ke) {
LOG.error("KeeperException while getting schema alter status for table = "
+ Bytes.toString(tableName), ke);
}
if (alterStatus != null) {
LOG.debug("Getting AlterStatus from SchemaChangeTracker for table = "
+ Bytes.toString(tableName) + " Alter Status = "
+ alterStatus.toString());
return new Pair<Integer, Integer>(alterStatus.getNumberOfRegionsProcessed(),
alterStatus.getNumberOfRegionsToProcess());
} else {
LOG.debug("MasterAlterStatus is NULL for table = "
+ Bytes.toString(tableName));
// should we throw IOException here as it makes more sense?
return new Pair<Integer, Integer>(0,0);
}
}
public void addColumn(byte [] tableName, HColumnDescriptor column) public void addColumn(byte [] tableName, HColumnDescriptor column)
throws IOException { throws IOException {
checkInitialized(); checkInitialized();
@ -1365,8 +1244,7 @@ Server {
return; return;
} }
} }
new TableAddFamilyHandler(tableName, column, this, this, new TableAddFamilyHandler(tableName, column, this, this).process();
this, supportInstantSchemaChanges).process();
if (cpHost != null) { if (cpHost != null) {
cpHost.postAddColumn(tableName, column); cpHost.postAddColumn(tableName, column);
} }
@ -1380,8 +1258,7 @@ Server {
return; return;
} }
} }
new TableModifyFamilyHandler(tableName, descriptor, this, this, new TableModifyFamilyHandler(tableName, descriptor, this, this).process();
this, supportInstantSchemaChanges).process();
if (cpHost != null) { if (cpHost != null) {
cpHost.postModifyColumn(tableName, descriptor); cpHost.postModifyColumn(tableName, descriptor);
} }
@ -1395,8 +1272,7 @@ Server {
return; return;
} }
} }
new TableDeleteFamilyHandler(tableName, c, this, this, new TableDeleteFamilyHandler(tableName, c, this, this).process();
this, supportInstantSchemaChanges).process();
if (cpHost != null) { if (cpHost != null) {
cpHost.postDeleteColumn(tableName, c); cpHost.postDeleteColumn(tableName, c);
} }
@ -1470,8 +1346,7 @@ Server {
if (cpHost != null) { if (cpHost != null) {
cpHost.preModifyTable(tableName, htd); cpHost.preModifyTable(tableName, htd);
} }
TableEventHandler tblHandle = new ModifyTableHandler(tableName, htd, this, TableEventHandler tblHandle = new ModifyTableHandler(tableName, htd, this, this);
this, this, supportInstantSchemaChanges);
this.executorService.submit(tblHandle); this.executorService.submit(tblHandle);
tblHandle.waitForPersist(); tblHandle.waitForPersist();
@ -1480,26 +1355,8 @@ Server {
} }
} }
private boolean isOnlineSchemaChangeAllowed() {
return conf.getBoolean(
"hbase.online.schema.update.enable", false);
}
@Override @Override
public void checkTableModifiable(final byte [] tableName, public void checkTableModifiable(final byte [] tableName)
EventHandler.EventType eventType)
throws IOException {
preCheckTableModifiable(tableName);
if (!eventType.isSchemaChangeEvent() ||
!isOnlineSchemaChangeAllowed()) {
if (!getAssignmentManager().getZKTable().
isDisabledTable(Bytes.toString(tableName))) {
throw new TableNotDisabledException(tableName);
}
}
}
private void preCheckTableModifiable(final byte[] tableName)
throws IOException { throws IOException {
String tableNameStr = Bytes.toString(tableName); String tableNameStr = Bytes.toString(tableName);
if (isCatalogTable(tableName)) { if (isCatalogTable(tableName)) {
@ -1508,6 +1365,10 @@ Server {
if (!MetaReader.tableExists(getCatalogTracker(), tableNameStr)) { if (!MetaReader.tableExists(getCatalogTracker(), tableNameStr)) {
throw new TableNotFoundException(tableNameStr); throw new TableNotFoundException(tableNameStr);
} }
if (!getAssignmentManager().getZKTable().
isDisabledTable(Bytes.toString(tableName))) {
throw new TableNotDisabledException(tableName);
}
} }
public void clearFromTransition(HRegionInfo hri) { public void clearFromTransition(HRegionInfo hri) {

View File

@ -514,7 +514,7 @@ public class MasterFileSystem {
*/ */
public HTableDescriptor addColumn(byte[] tableName, HColumnDescriptor hcd) public HTableDescriptor addColumn(byte[] tableName, HColumnDescriptor hcd)
throws IOException { throws IOException {
LOG.debug("AddColumn. Table = " + Bytes.toString(tableName) + " HCD = " + LOG.info("AddColumn. Table = " + Bytes.toString(tableName) + " HCD = " +
hcd.toString()); hcd.toString());
HTableDescriptor htd = this.services.getTableDescriptors().get(tableName); HTableDescriptor htd = this.services.getTableDescriptors().get(tableName);
if (htd == null) { if (htd == null) {

View File

@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.TableDescriptors;
import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
/** /**
@ -56,15 +55,12 @@ public interface MasterServices extends Server {
public ExecutorService getExecutorService(); public ExecutorService getExecutorService();
/** /**
* Check table modifiable. i.e not ROOT or META and offlined for all commands except * Check table is modifiable; i.e. exists and is offline.
* alter commands * @param tableName Name of table to check.
* @param tableName * @throws TableNotDisabledException
* @param eventType * @throws TableNotFoundException
* @throws IOException
*/ */
public void checkTableModifiable(final byte [] tableName, public void checkTableModifiable(final byte [] tableName) throws IOException;
EventHandler.EventType eventType)
throws IOException;
/** /**
* Create a table using the given table definition. * Create a table using the given table definition.
@ -80,21 +76,8 @@ public interface MasterServices extends Server {
*/ */
public TableDescriptors getTableDescriptors(); public TableDescriptors getTableDescriptors();
/**
* Get Master Schema change tracker
* @return
*/
public MasterSchemaChangeTracker getSchemaChangeTracker();
/**
* Return the Region server tracker.
* @return RegionServerTracker
*/
public RegionServerTracker getRegionServerTracker();
/** /**
* @return true if master enables ServerShutdownHandler; * @return true if master enables ServerShutdownHandler;
*/ */
public boolean isServerShutdownHandlerEnabled(); public boolean isServerShutdownHandlerEnabled();
} }

View File

@ -345,15 +345,6 @@ public class ServerManager {
} }
} }
/**
* Exclude a RS from any pending schema change process.
* @param serverName
*/
private void excludeRegionServerFromSchemaChanges(final ServerName serverName) {
this.services.getSchemaChangeTracker()
.excludeRegionServerForSchemaChanges(serverName.getServerName());
}
/* /*
* Expire the passed server. Add it to list of deadservers and queue a * Expire the passed server. Add it to list of deadservers and queue a
* shutdown processing. * shutdown processing.
@ -365,7 +356,6 @@ public class ServerManager {
this.deadNotExpiredServers.add(serverName); this.deadNotExpiredServers.add(serverName);
return; return;
} }
excludeRegionServerFromSchemaChanges(serverName);
if (!this.onlineServers.containsKey(serverName)) { if (!this.onlineServers.containsKey(serverName)) {
LOG.warn("Received expiration of " + serverName + LOG.warn("Received expiration of " + serverName +
" but server is not currently online"); " but server is not currently online");

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.catalog.MetaEditor; import org.apache.hadoop.hbase.catalog.MetaEditor;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.AssignmentManager;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -40,11 +39,9 @@ public class DeleteTableHandler extends TableEventHandler {
private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class); private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class);
public DeleteTableHandler(byte [] tableName, Server server, public DeleteTableHandler(byte [] tableName, Server server,
final MasterServices masterServices, HMasterInterface masterInterface, final MasterServices masterServices)
boolean instantChange)
throws IOException { throws IOException {
super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices, super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices);
masterInterface, instantChange);
// The next call fails if no such table. // The next call fails if no such table.
getTableDescriptor(); getTableDescriptor();
} }

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
@InterfaceAudience.Private @InterfaceAudience.Private
@ -35,11 +34,9 @@ public class ModifyTableHandler extends TableEventHandler {
public ModifyTableHandler(final byte [] tableName, public ModifyTableHandler(final byte [] tableName,
final HTableDescriptor htd, final Server server, final HTableDescriptor htd, final Server server,
final MasterServices masterServices, final HMasterInterface masterInterface, final MasterServices masterServices)
boolean instantModify)
throws IOException { throws IOException {
super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices, super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices);
masterInterface, instantModify);
// Check table exists. // Check table exists.
getTableDescriptor(); getTableDescriptor();
// This is the new schema we are going to write out as this modification. // This is the new schema we are going to write out as this modification.

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
/** /**
@ -40,10 +39,8 @@ public class TableAddFamilyHandler extends TableEventHandler {
private final HColumnDescriptor familyDesc; private final HColumnDescriptor familyDesc;
public TableAddFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc, public TableAddFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc,
Server server, final MasterServices masterServices, Server server, final MasterServices masterServices) throws IOException {
HMasterInterface masterInterface, boolean instantChange) throws IOException { super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices);
super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices,
masterInterface, instantChange);
HTableDescriptor htd = getTableDescriptor(); HTableDescriptor htd = getTableDescriptor();
if (htd.hasFamily(familyDesc.getName())) { if (htd.hasFamily(familyDesc.getName())) {
throw new InvalidFamilyOperationException("Family '" + throw new InvalidFamilyOperationException("Family '" +

View File

@ -26,7 +26,6 @@ import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -39,10 +38,8 @@ public class TableDeleteFamilyHandler extends TableEventHandler {
private final byte [] familyName; private final byte [] familyName;
public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName, public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName,
Server server, final MasterServices masterServices, Server server, final MasterServices masterServices) throws IOException {
HMasterInterface masterInterface, boolean instantChange) throws IOException { super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices);
super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices,
masterInterface, instantChange);
HTableDescriptor htd = getTableDescriptor(); HTableDescriptor htd = getTableDescriptor();
this.familyName = hasColumnFamily(htd, familyName); this.familyName = hasColumnFamily(htd, familyName);
} }

View File

@ -37,19 +37,13 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.TableExistsException; import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.catalog.MetaReader;
import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.BulkReOpen; import org.apache.hadoop.hbase.master.BulkReOpen;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKAssign;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.KeeperException;
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
@ -66,23 +60,33 @@ import com.google.common.collect.Maps;
public abstract class TableEventHandler extends EventHandler { public abstract class TableEventHandler extends EventHandler {
private static final Log LOG = LogFactory.getLog(TableEventHandler.class); private static final Log LOG = LogFactory.getLog(TableEventHandler.class);
protected final MasterServices masterServices; protected final MasterServices masterServices;
protected HMasterInterface master = null;
protected final byte [] tableName; protected final byte [] tableName;
protected final String tableNameStr; protected final String tableNameStr;
protected boolean instantAction = false;
protected boolean persistedToZk = false; protected boolean persistedToZk = false;
public TableEventHandler(EventType eventType, byte [] tableName, Server server, public TableEventHandler(EventType eventType, byte [] tableName, Server server,
MasterServices masterServices, HMasterInterface masterInterface, MasterServices masterServices)
boolean instantSchemaChange)
throws IOException { throws IOException {
super(server, eventType); super(server, eventType);
this.masterServices = masterServices; this.masterServices = masterServices;
this.tableName = tableName; this.tableName = tableName;
this.masterServices.checkTableModifiable(tableName, eventType); try {
this.masterServices.checkTableModifiable(tableName);
} catch (TableNotDisabledException ex) {
if (isOnlineSchemaChangeAllowed()
&& eventType.isOnlineSchemaChangeSupported()) {
LOG.debug("Ignoring table not disabled exception " +
"for supporting online schema changes.");
} else {
throw ex;
}
}
this.tableNameStr = Bytes.toString(this.tableName); this.tableNameStr = Bytes.toString(this.tableName);
this.instantAction = instantSchemaChange; }
this.master = masterInterface;
private boolean isOnlineSchemaChangeAllowed() {
return this.server.getConfiguration().getBoolean(
"hbase.online.schema.update.enable", false);
} }
@Override @Override
@ -94,7 +98,16 @@ public abstract class TableEventHandler extends EventHandler {
MetaReader.getTableRegions(this.server.getCatalogTracker(), MetaReader.getTableRegions(this.server.getCatalogTracker(),
tableName); tableName);
handleTableOperation(hris); handleTableOperation(hris);
handleSchemaChanges(hris); if (eventType.isOnlineSchemaChangeSupported() && this.masterServices.
getAssignmentManager().getZKTable().
isEnabledTable(Bytes.toString(tableName))) {
if (reOpenAllRegions(hris)) {
LOG.info("Completed table operation " + eventType + " on table " +
Bytes.toString(tableName));
} else {
LOG.warn("Error on reopening the regions");
}
}
} catch (IOException e) { } catch (IOException e) {
LOG.error("Error manipulating table " + Bytes.toString(tableName), e); LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
} catch (KeeperException e) { } catch (KeeperException e) {
@ -105,48 +118,13 @@ public abstract class TableEventHandler extends EventHandler {
} }
} }
private void handleSchemaChanges(List<HRegionInfo> regions)
throws IOException {
if (instantAction && regions != null && !regions.isEmpty()) {
handleInstantSchemaChanges(regions);
} else {
handleRegularSchemaChanges(regions);
}
}
/**
* Perform schema changes only if the table is in enabled state.
* @return
*/
private boolean canPerformSchemaChange() {
return (eventType.isSchemaChangeEvent() && this.masterServices.
getAssignmentManager().getZKTable().
isEnabledTable(Bytes.toString(tableName)));
}
private void handleRegularSchemaChanges(List<HRegionInfo> regions)
throws IOException {
if (canPerformSchemaChange()) {
this.masterServices.getAssignmentManager().setRegionsToReopen(regions);
setPersist();
if (reOpenAllRegions(regions)) {
LOG.info("Completed table operation " + eventType + " on table " +
Bytes.toString(tableName));
} else {
LOG.warn("Error on reopening the regions");
}
}
}
public boolean reOpenAllRegions(List<HRegionInfo> regions) throws IOException { public boolean reOpenAllRegions(List<HRegionInfo> regions) throws IOException {
boolean done = false; boolean done = false;
LOG.info("Bucketing regions by region server..."); LOG.info("Bucketing regions by region server...");
HTable table = new HTable(masterServices.getConfiguration(), tableName); HTable table = new HTable(masterServices.getConfiguration(), tableName);
TreeMap<ServerName, List<HRegionInfo>> serverToRegions = Maps TreeMap<ServerName, List<HRegionInfo>> serverToRegions = Maps
.newTreeMap(); .newTreeMap();
NavigableMap<HRegionInfo, ServerName> hriHserverMapping NavigableMap<HRegionInfo, ServerName> hriHserverMapping = table.getRegionLocations();
= table.getRegionLocations();
List<HRegionInfo> reRegions = new ArrayList<HRegionInfo>(); List<HRegionInfo> reRegions = new ArrayList<HRegionInfo>();
for (HRegionInfo hri : regions) { for (HRegionInfo hri : regions) {
ServerName rsLocation = hriHserverMapping.get(hri); ServerName rsLocation = hriHserverMapping.get(hri);
@ -188,32 +166,6 @@ public abstract class TableEventHandler extends EventHandler {
return done; return done;
} }
/**
* Check whether any of the regions from the list of regions is undergoing a split.
* We simply check whether there is a unassigned node for any of the region and if so
* we return as true.
* @param regionInfos
* @return
*/
private boolean isSplitInProgress(List<HRegionInfo> regionInfos) {
for (HRegionInfo hri : regionInfos) {
ZooKeeperWatcher zkw = this.masterServices.getZooKeeper();
String node = ZKAssign.getNodeName(zkw, hri.getEncodedName());
try {
if (ZKUtil.checkExists(zkw, node) != -1) {
LOG.debug("Region " + hri.getRegionNameAsString() + " is unassigned. Assuming" +
" that it is undergoing a split");
return true;
}
} catch (KeeperException ke) {
LOG.debug("KeeperException while determining splits in progress.", ke);
// Assume no splits happening?
return false;
}
}
return false;
}
/** /**
* Table modifications are processed asynchronously, but provide an API for * Table modifications are processed asynchronously, but provide an API for
* you to query their status. * you to query their status.
@ -238,65 +190,6 @@ public abstract class TableEventHandler extends EventHandler {
} }
} }
/**
* Wait for region split transaction in progress (if any)
* @param regions
* @param status
*/
private void waitForInflightSplit(List<HRegionInfo> regions, MonitoredTask status) {
while (isSplitInProgress(regions)) {
try {
status.setStatus("Alter Schema is waiting for split region to complete.");
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
protected void handleInstantSchemaChanges(List<HRegionInfo> regions) {
if (regions == null || regions.isEmpty()) {
LOG.debug("Region size is null or empty. Ignoring alter request.");
return;
}
MonitoredTask status = TaskMonitor.get().createStatus(
"Handling alter table request for table = " + tableNameStr);
if (canPerformSchemaChange()) {
boolean prevBalanceSwitch = false;
try {
// turn off load balancer synchronously
prevBalanceSwitch = master.synchronousBalanceSwitch(false);
waitForInflightSplit(regions, status);
MasterSchemaChangeTracker masterSchemaChangeTracker =
this.masterServices.getSchemaChangeTracker();
masterSchemaChangeTracker
.createSchemaChangeNode(Bytes.toString(tableName),
regions.size());
while(!masterSchemaChangeTracker.doesSchemaChangeNodeExists(
Bytes.toString(tableName))) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
status.markComplete("Created ZK node for handling the alter table request for table = "
+ tableNameStr);
} catch (KeeperException e) {
LOG.warn("Instant schema change failed for table " + tableNameStr, e);
status.setStatus("Instant schema change failed for table " + tableNameStr
+ " Cause = " + e.getCause());
} catch (IOException ioe) {
LOG.warn("Instant schema change failed for table " + tableNameStr, ioe);
status.setStatus("Instant schema change failed for table " + tableNameStr
+ " Cause = " + ioe.getCause());
} finally {
master.synchronousBalanceSwitch(prevBalanceSwitch);
}
}
}
/** /**
* @return Table descriptor for this table * @return Table descriptor for this table
* @throws TableExistsException * @throws TableExistsException

View File

@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.InvalidFamilyOperationException;
import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.Server;
import org.apache.hadoop.hbase.ipc.HMasterInterface;
import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -41,10 +40,8 @@ public class TableModifyFamilyHandler extends TableEventHandler {
public TableModifyFamilyHandler(byte[] tableName, public TableModifyFamilyHandler(byte[] tableName,
HColumnDescriptor familyDesc, Server server, HColumnDescriptor familyDesc, Server server,
final MasterServices masterServices, final MasterServices masterServices) throws IOException {
HMasterInterface masterInterface, boolean instantChange) throws IOException { super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices);
super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices,
masterInterface, instantChange);
HTableDescriptor htd = getTableDescriptor(); HTableDescriptor htd = getTableDescriptor();
hasColumnFamily(htd, familyDesc.getName()); hasColumnFamily(htd, familyDesc.getName());
this.familyDesc = familyDesc; this.familyDesc = familyDesc;

View File

@ -157,29 +157,12 @@ public class CompactSplitThread implements CompactionRequestor {
return false; return false;
} }
/**
* Wait for mid-flight schema alter requests. (if any). We don't want to execute a split
* when a schema alter is in progress as we end up in an inconsistent state.
* @param tableName
*/
private void waitForInflightSchemaChange(String tableName) {
while (this.server.getSchemaChangeTracker()
.isSchemaChangeInProgress(tableName)) {
try {
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
public synchronized void requestSplit(final HRegion r, byte[] midKey) { public synchronized void requestSplit(final HRegion r, byte[] midKey) {
if (midKey == null) { if (midKey == null) {
LOG.debug("Region " + r.getRegionNameAsString() + LOG.debug("Region " + r.getRegionNameAsString() +
" not splittable because midkey=null"); " not splittable because midkey=null");
return; return;
} }
waitForInflightSchemaChange(r.getRegionInfo().getTableNameAsString());
try { try {
this.splits.execute(new SplitRequest(r, midKey, this.server)); this.splits.execute(new SplitRequest(r, midKey, this.server));
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {

View File

@ -148,7 +148,6 @@ import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.util.VersionInfo;
import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker;
import org.apache.hadoop.hbase.zookeeper.RootRegionTracker; import org.apache.hadoop.hbase.zookeeper.RootRegionTracker;
import org.apache.hadoop.hbase.zookeeper.SchemaChangeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
@ -294,9 +293,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
// Cluster Status Tracker // Cluster Status Tracker
private ClusterStatusTracker clusterStatusTracker; private ClusterStatusTracker clusterStatusTracker;
// Schema change Tracker
private SchemaChangeTracker schemaChangeTracker;
// Log Splitting Worker // Log Splitting Worker
private SplitLogWorker splitLogWorker; private SplitLogWorker splitLogWorker;
@ -599,11 +595,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf,
this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE));
catalogTracker.start(); catalogTracker.start();
// Schema change tracker
this.schemaChangeTracker = new SchemaChangeTracker(this.zooKeeper,
this, this);
this.schemaChangeTracker.start();
} }
/** /**
@ -2901,26 +2892,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
splitRegion(regionInfo, null); splitRegion(regionInfo, null);
} }
/**
* Wait for mid-flight schema change requests. (if any)
* @param tableName
*/
private void waitForSchemaChange(String tableName) {
while (schemaChangeTracker.isSchemaChangeInProgress(tableName)) {
try {
LOG.debug("Schema alter is inprogress for table = " + tableName
+ " Waiting for alter to complete before a split");
Thread.sleep(100);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
@Override @Override
public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint) public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint)
throws NotServingRegionException, IOException { throws NotServingRegionException, IOException {
waitForSchemaChange(Bytes.toString(regionInfo.getTableName()));
checkOpen(); checkOpen();
HRegion region = getRegion(regionInfo.getRegionName()); HRegion region = getRegion(regionInfo.getRegionName());
region.flushcache(); region.flushcache();
@ -3671,33 +3645,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
return wal.rollWriter(true); return wal.rollWriter(true);
} }
/**
* Refresh schema changes for given region.
* @param hRegion HRegion to refresh
* @throws IOException
*/
public void refreshRegion(HRegion hRegion) throws IOException {
if (hRegion != null) {
synchronized (this.onlineRegions) {
HRegionInfo regionInfo = hRegion.getRegionInfo();
// Close the region
hRegion.close();
// Remove from online regions
removeFromOnlineRegions(regionInfo.getEncodedName());
// Get new HTD
HTableDescriptor htd = this.tableDescriptors.get(regionInfo.getTableName());
LOG.debug("HTD for region = " + regionInfo.getRegionNameAsString()
+ " Is = " + htd );
HRegion region =
HRegion.openHRegion(hRegion.getRegionInfo(), htd, hlog, conf,
this, null);
// Add new region to the onlineRegions
addToOnlineRegions(region);
}
}
}
/** /**
* Gets the online regions of the specified table. * Gets the online regions of the specified table.
* This method looks at the in-memory onlineRegions. It does not go to <code>.META.</code>. * This method looks at the in-memory onlineRegions. It does not go to <code>.META.</code>.
@ -3721,10 +3668,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
return tableRegions; return tableRegions;
} }
public SchemaChangeTracker getSchemaChangeTracker() {
return this.schemaChangeTracker;
}
// used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070). // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070).
public String[] getCoprocessors() { public String[] getCoprocessors() {
HServerLoad hsl = buildServerLoad(); HServerLoad hsl = buildServerLoad();
@ -3741,5 +3684,4 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler,
mxBeanInfo); mxBeanInfo);
LOG.info("Registered RegionServer MXBean"); LOG.info("Registered RegionServer MXBean");
} }
} }

View File

@ -19,12 +19,12 @@
*/ */
package org.apache.hadoop.hbase.regionserver; package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Server;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Server;
/** /**
* Interface to Map of online regions. In the Map, the key is the region's * Interface to Map of online regions. In the Map, the key is the region's
* encoded name and the value is an {@link HRegion} instance. * encoded name and the value is an {@link HRegion} instance.
@ -54,6 +54,7 @@ interface OnlineRegions extends Server {
* null if named region is not member of the online regions. * null if named region is not member of the online regions.
*/ */
public HRegion getFromOnlineRegions(String encodedRegionName); public HRegion getFromOnlineRegions(String encodedRegionName);
/** /**
* Get all online regions of a table in this RS. * Get all online regions of a table in this RS.
* @param tableName * @param tableName
@ -61,11 +62,4 @@ interface OnlineRegions extends Server {
* @throws java.io.IOException * @throws java.io.IOException
*/ */
public List<HRegion> getOnlineRegions(byte[] tableName) throws IOException; public List<HRegion> getOnlineRegions(byte[] tableName) throws IOException;
/**
* Refresh a given region updating it with latest HTD info.
* @param hRegion
*/
public void refreshRegion(HRegion hRegion) throws IOException;
} }

View File

@ -1,828 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.master.MasterServices;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Writable;
import org.apache.zookeeper.KeeperException;
@InterfaceAudience.Private
public class MasterSchemaChangeTracker extends ZooKeeperNodeTracker {
public static final Log LOG = LogFactory.getLog(MasterSchemaChangeTracker.class);
private final MasterServices masterServices;
// Used by tests only. Do not change this.
private volatile int sleepTimeMillis = 0;
// schema changes pending more than this time will be timed out.
private long schemaChangeTimeoutMillis = 30000;
/**
* Constructs a new ZK node tracker.
* <p/>
* <p>After construction, use {@link #start} to kick off tracking.
*
* @param watcher
* @param abortable
*/
public MasterSchemaChangeTracker(ZooKeeperWatcher watcher,
Abortable abortable, MasterServices masterServices,
long schemaChangeTimeoutMillis) {
super(watcher, watcher.schemaZNode, abortable);
this.masterServices = masterServices;
this.schemaChangeTimeoutMillis = schemaChangeTimeoutMillis;
}
@Override
public void start() {
try {
watcher.registerListener(this);
List<String> tables =
ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode);
processCompletedSchemaChanges(tables);
} catch (KeeperException e) {
LOG.error("MasterSchemaChangeTracker startup failed.", e);
abortable.abort("MasterSchemaChangeTracker startup failed", e);
}
}
private List<String> getCurrentTables() throws KeeperException {
return
ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode);
}
/**
* When a primary master crashes and the secondary master takes over
* mid-flight during an alter process, the secondary should cleanup any completed
* schema changes not handled by the previous master.
* @param tables
* @throws KeeperException
*/
private void processCompletedSchemaChanges(List<String> tables)
throws KeeperException {
if (tables == null || tables.isEmpty()) {
String msg = "No current schema change in progress. Skipping cleanup";
LOG.debug(msg);
return;
}
String msg = "Master seeing following tables undergoing schema change " +
"process. Tables = " + tables;
MonitoredTask status = TaskMonitor.get().createStatus(msg);
LOG.debug(msg);
for (String table : tables) {
LOG.debug("Processing table = "+ table);
status.setStatus("Processing table = "+ table);
try {
processTableNode(table);
} catch (IOException e) {
String errmsg = "IOException while processing completed schema changes."
+ " Cause = " + e.getCause();
LOG.error(errmsg, e);
status.setStatus(errmsg);
}
}
}
/**
* Get current alter status for a table.
* @param tableName
* @return MasterAlterStatus
* @throws KeeperException
* @throws IOException
*/
public MasterAlterStatus getMasterAlterStatus(String tableName)
throws KeeperException, IOException {
String path = getSchemaChangeNodePathForTable(tableName);
byte[] state = ZKUtil.getData(watcher, path);
if (state == null || state.length <= 0) {
return null;
}
MasterAlterStatus mas = new MasterAlterStatus();
Writables.getWritable(state, mas);
return mas;
}
/**
* Get RS specific alter status for a table & server
* @param tableName
* @param serverName
* @return Region Server's Schema alter status
* @throws KeeperException
* @throws IOException
*/
private SchemaChangeTracker.SchemaAlterStatus getRSSchemaAlterStatus(
String tableName, String serverName)
throws KeeperException, IOException {
String childPath =
getSchemaChangeNodePathForTableAndServer(tableName, serverName);
byte[] childData = ZKUtil.getData(this.watcher, childPath);
if (childData == null || childData.length <= 0) {
return null;
}
SchemaChangeTracker.SchemaAlterStatus sas =
new SchemaChangeTracker.SchemaAlterStatus();
Writables.getWritable(childData, sas);
LOG.debug("Schema Status data for server = " + serverName + " table = "
+ tableName + " == " + sas);
return sas;
}
/**
* Update the master's alter status based on all region server's response.
* @param servers
* @param tableName
* @throws IOException
*/
private void updateMasterAlterStatus(MasterAlterStatus mas,
List<String> servers, String tableName)
throws IOException, KeeperException {
for (String serverName : servers) {
SchemaChangeTracker.SchemaAlterStatus sas =
getRSSchemaAlterStatus(tableName, serverName);
if (sas != null) {
mas.update(sas);
LOG.debug("processTableNodeWithState:Updated Master Alter Status = "
+ mas + " for server = " + serverName);
} else {
LOG.debug("SchemaAlterStatus is NULL for table = " + tableName);
}
}
}
/**
* If schema alter is handled for this table, then delete all the ZK nodes
* created for this table.
* @param tableName
* @throws KeeperException
*/
private void processTableNode(String tableName) throws KeeperException,
IOException {
LOG.debug("processTableNodeWithState. TableName = " + tableName);
List<String> servers =
ZKUtil.listChildrenAndWatchThem(watcher,
getSchemaChangeNodePathForTable(tableName));
MasterAlterStatus mas = getMasterAlterStatus(tableName);
if (mas == null) {
LOG.debug("MasterAlterStatus is NULL. Table = " + tableName);
return;
}
updateMasterAlterStatus(mas, servers, tableName);
LOG.debug("Current Alter status = " + mas);
String nodePath = getSchemaChangeNodePathForTable(tableName);
ZKUtil.updateExistingNodeData(this.watcher, nodePath,
Writables.getBytes(mas), getZKNodeVersion(nodePath));
processAlterStatus(mas, tableName, servers);
}
/**
* Evaluate the master alter status and determine the current status.
* @param alterStatus
* @param tableName
* @param servers
* @param status
*/
private void processAlterStatus(MasterAlterStatus alterStatus,
String tableName, List<String> servers)
throws KeeperException {
if (alterStatus.getNumberOfRegionsToProcess()
== alterStatus.getNumberOfRegionsProcessed()) {
// schema change completed.
String msg = "All region servers have successfully processed the " +
"schema changes for table = " + tableName
+ " . Deleting the schema change node for table = "
+ tableName + " Region servers processed the schema change" +
" request = " + alterStatus.getProcessedHosts()
+ " Total number of regions = " + alterStatus.getNumberOfRegionsToProcess()
+ " Processed regions = " + alterStatus.getNumberOfRegionsProcessed();
MonitoredTask status = TaskMonitor.get().createStatus(
"Checking alter schema request status for table = " + tableName);
status.markComplete(msg);
LOG.debug(msg);
cleanProcessedTableNode(getSchemaChangeNodePathForTable(tableName));
} else {
if (alterStatus.getErrorCause() != null
&& alterStatus.getErrorCause().trim().length() > 0) {
String msg = "Alter schema change failed "
+ "for table = " + tableName + " Number of online regions = "
+ alterStatus.getNumberOfRegionsToProcess() + " processed regions count = "
+ alterStatus.getNumberOfRegionsProcessed()
+ " Original list = " + alterStatus.hostsToProcess + " Processed servers = "
+ servers
+ " Error Cause = " + alterStatus.getErrorCause();
MonitoredTask status = TaskMonitor.get().createStatus(
"Checking alter schema request status for table = " + tableName);
// we have errors.
LOG.debug(msg);
status.abort(msg);
} else {
String msg = "Not all region servers have processed the schema changes"
+ "for table = " + tableName + " Number of online regions = "
+ alterStatus.getNumberOfRegionsToProcess() + " processed regions count = "
+ alterStatus.getNumberOfRegionsProcessed()
+ " Original list = " + alterStatus.hostsToProcess + " Processed servers = "
+ servers + " Alter STate = "
+ alterStatus.getCurrentAlterStatus();
LOG.debug(msg);
// status.setStatus(msg);
}
}
}
/**
* Check whether a in-flight schema change request has expired.
* @param tableName
* @return true is the schema change request expired.
* @throws IOException
*/
private boolean hasSchemaChangeExpiredFor(String tableName)
throws IOException, KeeperException {
MasterAlterStatus mas = getMasterAlterStatus(tableName);
long createdTimeStamp = mas.getStamp();
long duration = System.currentTimeMillis() - createdTimeStamp;
LOG.debug("Created TimeStamp = " + createdTimeStamp
+ " duration = " + duration + " Table = " + tableName
+ " Master Alter Status = " + mas);
return (duration > schemaChangeTimeoutMillis);
}
/**
* Handle failed and expired schema changes. We simply delete all the
* expired/failed schema change attempts. Why we should do this ?
* 1) Keeping the failed/expired schema change nodes longer prohibits any
* future schema changes for the table.
* 2) Any lingering expired/failed schema change requests will prohibit the
* load balancer from running.
*/
public void handleFailedOrExpiredSchemaChanges() {
try {
List<String> tables = getCurrentTables();
for (String table : tables) {
String statmsg = "Cleaning failed or expired schema change requests. " +
"current tables undergoing " +
"schema change process = " + tables;
MonitoredTask status = TaskMonitor.get().createStatus(statmsg);
LOG.debug(statmsg);
if (hasSchemaChangeExpiredFor(table)) {
// time out.. currently, we abandon the in-flight schema change due to
// time out.
// Here, there are couple of options to consider. One could be to
// attempt a retry of the schema change and see if it succeeds, and
// another could be to simply rollback the schema change effort and
// see if it succeeds.
String msg = "Schema change for table = " + table + " has expired."
+ " Schema change for this table has been in progress for " +
+ schemaChangeTimeoutMillis +
"Deleting the node now.";
LOG.debug(msg);
ZKUtil.deleteNodeRecursively(this.watcher,
getSchemaChangeNodePathForTable(table));
} else {
String msg = "Schema change request is in progress for " +
" table = " + table;
LOG.debug(msg);
status.setStatus(msg);
}
}
} catch (IOException e) {
String msg = "IOException during handleFailedExpiredSchemaChanges."
+ e.getCause();
LOG.error(msg, e);
TaskMonitor.get().createStatus(msg);
} catch (KeeperException ke) {
String msg = "KeeperException during handleFailedExpiredSchemaChanges."
+ ke.getCause();
LOG.error(msg, ke);
TaskMonitor.get().createStatus(msg);
}
}
/**
* Clean the nodes of completed schema change table.
* @param path
* @throws KeeperException
*/
private void cleanProcessedTableNode(String path) throws KeeperException {
if (sleepTimeMillis > 0) {
try {
LOG.debug("Master schema change tracker sleeping for "
+ sleepTimeMillis);
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
ZKUtil.deleteNodeRecursively(this.watcher, path);
LOG.debug("Deleted all nodes for path " + path);
}
/**
* Exclude a RS from schema change request (if applicable)
* We will exclude a RS from schema change request processing if 1) RS
* has online regions for the table AND 2) RS went down mid-flight
* during schema change process. We don't have to deal with RS going
* down mid-flight during a schema change as the online regions from
* the dead RS will get reassigned to some other RS and the
* process of reassign inherently takes care of the schema change as well.
* @param serverName
*/
public void excludeRegionServerForSchemaChanges(String serverName) {
try {
MonitoredTask status = TaskMonitor.get().createStatus(
"Processing schema change exclusion for region server = " + serverName);
List<String> tables =
ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode);
if (tables == null || tables.isEmpty()) {
String msg = "No schema change in progress. Skipping exclusion for " +
"server = "+ serverName;
LOG.debug(msg);
status.setStatus(msg);
return ;
}
for (String tableName : tables) {
excludeRegionServer(tableName, serverName, status);
}
} catch(KeeperException ke) {
LOG.error("KeeperException during excludeRegionServerForSchemaChanges", ke);
} catch(IOException ioe) {
LOG.error("IOException during excludeRegionServerForSchemaChanges", ioe);
}
}
/**
* Check whether a schema change is in progress for a given table on a
* given RS.
* @param tableName
* @param serverName
* @return TRUE is this RS is currently processing a schema change request
* for the table.
* @throws KeeperException
*/
private boolean isSchemaChangeApplicableFor(String tableName,
String serverName)
throws KeeperException {
List<String> servers = ZKUtil.listChildrenAndWatchThem(watcher,
getSchemaChangeNodePathForTable(tableName));
return (servers.contains(serverName));
}
/**
* Exclude a region server for a table (if applicable) from schema change processing.
* @param tableName
* @param serverName
* @param status
* @throws KeeperException
* @throws IOException
*/
private void excludeRegionServer(String tableName, String serverName,
MonitoredTask status)
throws KeeperException, IOException {
if (isSchemaChangeApplicableFor(tableName, serverName)) {
String msg = "Excluding RS " + serverName + " from schema change process" +
" for table = " + tableName;
LOG.debug(msg);
status.setStatus(msg);
SchemaChangeTracker.SchemaAlterStatus sas =
getRSSchemaAlterStatus(tableName, serverName);
if (sas == null) {
LOG.debug("SchemaAlterStatus is NULL for table = " + tableName
+ " server = " + serverName);
return;
}
// Set the status to IGNORED so we can process it accordingly.
sas.setCurrentAlterStatus(
SchemaChangeTracker.SchemaAlterStatus.AlterState.IGNORED);
LOG.debug("Updating the current schema status to " + sas);
String nodePath = getSchemaChangeNodePathForTableAndServer(tableName,
serverName);
ZKUtil.updateExistingNodeData(this.watcher,
nodePath, Writables.getBytes(sas), getZKNodeVersion(nodePath));
} else {
LOG.debug("Skipping exclusion of RS " + serverName
+ " from schema change process"
+ " for table = " + tableName
+ " as it did not possess any online regions for the table");
}
processTableNode(tableName);
}
private int getZKNodeVersion(String nodePath) throws KeeperException {
return ZKUtil.checkExists(this.watcher, nodePath);
}
/**
* Create a new schema change ZK node.
* @param tableName Table name that is getting altered
* @throws KeeperException
*/
public void createSchemaChangeNode(String tableName,
int numberOfRegions)
throws KeeperException, IOException {
MonitoredTask status = TaskMonitor.get().createStatus(
"Creating schema change node for table = " + tableName);
LOG.debug("Creating schema change node for table = "
+ tableName + " Path = "
+ getSchemaChangeNodePathForTable(tableName));
if (doesSchemaChangeNodeExists(tableName)) {
LOG.debug("Schema change node already exists for table = " + tableName
+ " Deleting the schema change node.");
// If we already see a schema change node for this table we wait till the previous
// alter process is complete. Ideally, we need not wait and we could simply delete
// existing schema change node for this table and create new one. But then the
// RS cloud will not be able to process concurrent schema updates for the same table
// as they will be working with same set of online regions for this table. Meaning the
// second alter change will not see any online regions (as they were being closed and
// re opened by the first change) and will miss the second one.
// We either handle this at the RS level using explicit locks while processing a table
// or do it here. I prefer doing it here as it seems much simpler and cleaner.
while(doesSchemaChangeNodeExists(tableName)) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
int rsCount = ZKUtil.getNumberOfChildren(this.watcher, watcher.rsZNode);
// if number of online RS = 0, we should not do anything!
if (rsCount <= 0) {
String msg = "Master is not seeing any online region servers. Aborting the " +
"schema change processing by region servers.";
LOG.debug(msg);
status.abort(msg);
} else {
LOG.debug("Master is seeing " + rsCount + " region servers online before " +
"the schema change process.");
MasterAlterStatus mas = new MasterAlterStatus(numberOfRegions,
getActiveRegionServersAsString());
LOG.debug("Master creating the master alter status = " + mas);
ZKUtil.createSetData(this.watcher,
getSchemaChangeNodePathForTable(tableName), Writables.getBytes(mas));
status.markComplete("Created the ZK node for schema change. Current Alter Status = "
+ mas.toString());
ZKUtil.listChildrenAndWatchThem(this.watcher,
getSchemaChangeNodePathForTable(tableName));
}
}
private String getActiveRegionServersAsString() {
StringBuffer sbuf = new StringBuffer();
List<ServerName> currentRS =
masterServices.getRegionServerTracker().getOnlineServers();
for (ServerName serverName : currentRS) {
sbuf.append(serverName.getServerName());
sbuf.append(" ");
}
LOG.debug("Current list of RS to process the schema change = "
+ sbuf.toString());
return sbuf.toString();
}
/**
* Create a new schema change ZK node.
* @param tableName
* @throws KeeperException
*/
public boolean doesSchemaChangeNodeExists(String tableName)
throws KeeperException {
return ZKUtil.checkExists(watcher,
getSchemaChangeNodePathForTable(tableName)) != -1;
}
/**
* Check whether there are any schema change requests that are in progress now.
* We simply assume that a schema change is in progress if we see a ZK schema node for
* any table. We may revisit for fine grained checks such as check the current alter status
* et al, but it is not required now.
* @return
*/
public boolean isSchemaChangeInProgress() {
try {
int schemaChangeCount = ZKUtil.getNumberOfChildren(this.watcher, watcher.schemaZNode);
return schemaChangeCount > 0;
} catch (KeeperException ke) {
LOG.debug("KeeperException while getting current schema change progress.");
// What do we do now??? currently reporting as false.
}
return false;
}
/**
* We get notified when a RS processes/or completed the schema change request.
* The path will be of the format /hbase/schema/<table name>
* @param path full path of the node whose children have changed
*/
@Override
public void nodeChildrenChanged(String path) {
String tableName = null;
if (path.startsWith(watcher.schemaZNode) &&
!path.equals(watcher.schemaZNode)) {
try {
LOG.debug("NodeChildrenChanged Path = " + path);
tableName = path.substring(path.lastIndexOf("/")+1, path.length());
processTableNode(tableName);
} catch (KeeperException e) {
TaskMonitor.get().createStatus(
"MasterSchemaChangeTracker: ZK exception while processing " +
" nodeChildrenChanged() event for table = " + tableName
+ " Cause = " + e.getCause());
LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting"
+ " schema change nodes", e);
} catch(IOException ioe) {
TaskMonitor.get().createStatus(
"MasterSchemaChangeTracker: ZK exception while processing " +
" nodeChildrenChanged() event for table = " + tableName
+ " Cause = " + ioe.getCause());
LOG.error("MasterSchemaChangeTracker: Unexpected IO exception getting"
+ " schema change nodes", ioe);
}
}
}
/**
* We get notified as and when the RS cloud updates their ZK nodes with
* progress information. The path will be of the format
* /hbase/schema/<table name>/<RS host name>
* @param path
*/
@Override
public void nodeDataChanged(String path) {
String tableName = null;
if (path.startsWith(watcher.schemaZNode) &&
!path.equals(watcher.schemaZNode)) {
try {
LOG.debug("NodeDataChanged Path = " + path);
String[] paths = path.split("/");
tableName = paths[3];
processTableNode(tableName);
} catch (KeeperException e) {
TaskMonitor.get().createStatus(
"MasterSchemaChangeTracker: ZK exception while processing " +
" nodeDataChanged() event for table = " + tableName
+ " Cause = " + e.getCause());
LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting"
+ " schema change nodes", e);
} catch(IOException ioe) {
TaskMonitor.get().createStatus(
"MasterSchemaChangeTracker: IO exception while processing " +
" nodeDataChanged() event for table = " + tableName
+ " Cause = " + ioe.getCause());
LOG.error("MasterSchemaChangeTracker: Unexpected IO exception getting"
+ " schema change nodes", ioe);
}
}
}
public String getSchemaChangeNodePathForTable(String tableName) {
return ZKUtil.joinZNode(watcher.schemaZNode, tableName);
}
/**
* Used only for tests. Do not use this. See TestInstantSchemaChange for more details
* on how this is getting used. This is primarily used to delay the schema complete
* processing by master so that we can test some complex scenarios such as
* master failover.
* @param sleepTimeMillis
*/
public void setSleepTimeMillis(int sleepTimeMillis) {
this.sleepTimeMillis = sleepTimeMillis;
}
private String getSchemaChangeNodePathForTableAndServer(
String tableName, String regionServerName) {
return ZKUtil.joinZNode(getSchemaChangeNodePathForTable(tableName),
regionServerName);
}
/**
* Holds the current alter state for a table. Alter state includes the
* current alter status (INPROCESS, FAILURE or SUCCESS (success is not getting
* used now.), timestamp of alter request, number of hosts online at the time
* of alter request, number of online regions to process for the schema change
* request, number of processed regions and a list of region servers that
* actually processed the schema change request.
*
* Master keeps track of schema change requests using the alter status and
* periodically updates the alter status based on RS cloud processings.
*/
public static class MasterAlterStatus implements Writable {
public enum AlterState {
INPROCESS, // Inprocess alter
SUCCESS, // completed alter
FAILURE // failure alter
}
private AlterState currentAlterStatus;
// TimeStamp
private long stamp;
private int numberOfRegionsToProcess;
private StringBuffer errorCause = new StringBuffer(" ");
private StringBuffer processedHosts = new StringBuffer(" ");
private String hostsToProcess;
private int numberOfRegionsProcessed = 0;
public MasterAlterStatus() {
}
public MasterAlterStatus(int numberOfRegions, String activeHosts) {
this.numberOfRegionsToProcess = numberOfRegions;
this.stamp = System.currentTimeMillis();
this.currentAlterStatus = AlterState.INPROCESS;
//this.rsToProcess = activeHosts;
this.hostsToProcess = activeHosts;
}
public AlterState getCurrentAlterStatus() {
return currentAlterStatus;
}
public void setCurrentAlterStatus(AlterState currentAlterStatus) {
this.currentAlterStatus = currentAlterStatus;
}
public long getStamp() {
return stamp;
}
public void setStamp(long stamp) {
this.stamp = stamp;
}
public int getNumberOfRegionsToProcess() {
return numberOfRegionsToProcess;
}
public void setNumberOfRegionsToProcess(int numberOfRegionsToProcess) {
this.numberOfRegionsToProcess = numberOfRegionsToProcess;
}
public int getNumberOfRegionsProcessed() {
return numberOfRegionsProcessed;
}
public void setNumberOfRegionsProcessed(int numberOfRegionsProcessed) {
this.numberOfRegionsProcessed += numberOfRegionsProcessed;
}
public String getHostsToProcess() {
return hostsToProcess;
}
public void setHostsToProcess(String hostsToProcess) {
this.hostsToProcess = hostsToProcess;
}
public String getErrorCause() {
return errorCause == null ? null : errorCause.toString();
}
public void setErrorCause(String errorCause) {
if (errorCause == null || errorCause.trim().length() <= 0) {
return;
}
if (this.errorCause == null) {
this.errorCause = new StringBuffer(errorCause);
} else {
this.errorCause.append(errorCause);
}
}
public String getProcessedHosts() {
return processedHosts.toString();
}
public void setProcessedHosts(String processedHosts) {
if (this.processedHosts == null) {
this.processedHosts = new StringBuffer(processedHosts);
} else {
this.processedHosts.append(" ").append(processedHosts);
}
}
/**
* Ignore or exempt a RS from schema change processing.
* Master will tweak the number of regions to process based on the
* number of online regions on the target RS and also remove the
* RS from list of hosts to process.
* @param schemaAlterStatus
*/
private void ignoreRSForSchemaChange(
SchemaChangeTracker.SchemaAlterStatus schemaAlterStatus) {
LOG.debug("Removing RS " + schemaAlterStatus.getHostName()
+ " from schema change process.");
hostsToProcess =
hostsToProcess.replaceAll(schemaAlterStatus.getHostName(), "");
int ignoreRegionsCount = schemaAlterStatus.getNumberOfOnlineRegions();
LOG.debug("Current number of regions processed = "
+ this.numberOfRegionsProcessed + " deducting ignored = "
+ ignoreRegionsCount
+ " final = " + (this.numberOfRegionsToProcess-ignoreRegionsCount));
if (this.numberOfRegionsToProcess > 0) {
this.numberOfRegionsToProcess -= ignoreRegionsCount;
} else {
LOG.debug("Number of regions to process is less than zero. This is odd");
}
}
/**
* Update the master alter status for this table based on RS alter status.
* @param schemaAlterStatus
*/
public void update(SchemaChangeTracker.SchemaAlterStatus schemaAlterStatus) {
this.setProcessedHosts(schemaAlterStatus.getHostName());
SchemaChangeTracker.SchemaAlterStatus.AlterState rsState =
schemaAlterStatus.getCurrentAlterStatus();
switch(rsState) {
case FAILURE:
LOG.debug("Schema update failure Status = "
+ schemaAlterStatus);
this.setCurrentAlterStatus(
MasterAlterStatus.AlterState.FAILURE);
this.setNumberOfRegionsProcessed(
schemaAlterStatus.getNumberOfRegionsProcessed());
this.setErrorCause(schemaAlterStatus.getErrorCause());
break;
case SUCCESS:
LOG.debug("Schema update SUCCESS Status = "
+ schemaAlterStatus);
this.setNumberOfRegionsProcessed(
schemaAlterStatus.getNumberOfRegionsProcessed());
this.setCurrentAlterStatus(MasterAlterStatus.AlterState.SUCCESS);
break;
case IGNORED:
LOG.debug("Schema update IGNORED Updating regions to " +
"process count. Status = "+ schemaAlterStatus);
ignoreRSForSchemaChange(schemaAlterStatus);
break;
default:
break;
}
}
@Override
public void readFields(DataInput in) throws IOException {
currentAlterStatus = AlterState.valueOf(in.readUTF());
stamp = in.readLong();
numberOfRegionsToProcess = in.readInt();
hostsToProcess = Bytes.toString(Bytes.readByteArray(in));
processedHosts = new StringBuffer(Bytes.toString(Bytes.readByteArray(in)));
errorCause = new StringBuffer(Bytes.toString(Bytes.readByteArray(in)));
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(currentAlterStatus.name());
out.writeLong(stamp);
out.writeInt(numberOfRegionsToProcess);
Bytes.writeByteArray(out, Bytes.toBytes(hostsToProcess));
Bytes.writeByteArray(out, Bytes.toBytes(processedHosts.toString()));
Bytes.writeByteArray(out, Bytes.toBytes(errorCause.toString()));
}
@Override
public String toString() {
return
" state= " + currentAlterStatus
+ ", ts= " + stamp
+ ", number of regions to process = " + numberOfRegionsToProcess
+ ", number of regions processed = " + numberOfRegionsProcessed
+ ", hosts = " + hostsToProcess
+ " , processed hosts = " + processedHosts
+ " , errorCause = " + errorCause;
}
}
}

View File

@ -1,478 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.zookeeper;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.RegionServerServices;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.zookeeper.KeeperException;
import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.io.Writable;
import java.io.*;
import java.util.List;
/**
* Region server schema change tracker. RS uses this tracker to keep track of
* alter schema requests from master and updates the status once the schema change
* is complete.
*/
@InterfaceAudience.Private
public class SchemaChangeTracker extends ZooKeeperNodeTracker {
public static final Log LOG = LogFactory.getLog(SchemaChangeTracker.class);
private RegionServerServices regionServer = null;
private volatile int sleepTimeMillis = 0;
/**
* Constructs a new ZK node tracker.
* <p/>
* <p>After construction, use {@link #start} to kick off tracking.
*
* @param watcher
* @param node
* @param abortable
*/
public SchemaChangeTracker(ZooKeeperWatcher watcher,
Abortable abortable,
RegionServerServices regionServer) {
super(watcher, watcher.schemaZNode, abortable);
this.regionServer = regionServer;
}
@Override
public void start() {
try {
watcher.registerListener(this);
ZKUtil.listChildrenAndWatchThem(watcher, node);
// Clean-up old in-process schema changes for this RS now?
} catch (KeeperException e) {
LOG.error("RegionServer SchemaChangeTracker startup failed with " +
"KeeperException.", e);
}
}
/**
* This event will be triggered whenever new schema change request is processed by the
* master. The path will be of the format /hbase/schema/<table name>
* @param path full path of the node whose children have changed
*/
@Override
public void nodeChildrenChanged(String path) {
LOG.debug("NodeChildrenChanged. Path = " + path);
if (path.equals(watcher.schemaZNode)) {
try {
List<String> tables =
ZKUtil.listChildrenAndWatchThem(watcher, watcher.schemaZNode);
LOG.debug("RS.SchemaChangeTracker: " +
"Current list of tables with schema change = " + tables);
if (tables != null) {
handleSchemaChange(tables);
} else {
LOG.error("No tables found for schema change event." +
" Skipping instant schema refresh");
}
} catch (KeeperException ke) {
String errmsg = "KeeperException while handling nodeChildrenChanged for path = "
+ path + " Cause = " + ke.getCause();
LOG.error(errmsg, ke);
TaskMonitor.get().createStatus(errmsg);
}
}
}
private void handleSchemaChange(List<String> tables) {
for (String tableName : tables) {
if (tableName != null) {
LOG.debug("Processing schema change with status for table = " + tableName);
handleSchemaChange(tableName);
}
}
}
private void handleSchemaChange(String tableName) {
int refreshedRegionsCount = 0, onlineRegionsCount = 0;
MonitoredTask status = null;
try {
List<HRegion> onlineRegions =
regionServer.getOnlineRegions(Bytes.toBytes(tableName));
if (onlineRegions != null && !onlineRegions.isEmpty()) {
status = TaskMonitor.get().createStatus("Region server "
+ regionServer.getServerName().getServerName()
+ " handling schema change for table = " + tableName
+ " number of online regions = " + onlineRegions.size());
onlineRegionsCount = onlineRegions.size();
createStateNode(tableName, onlineRegions.size());
for (HRegion hRegion : onlineRegions) {
regionServer.refreshRegion(hRegion);
refreshedRegionsCount ++;
}
SchemaAlterStatus alterStatus = getSchemaAlterStatus(tableName);
alterStatus.update(SchemaAlterStatus.AlterState.SUCCESS, refreshedRegionsCount);
updateSchemaChangeStatus(tableName, alterStatus);
String msg = "Refresh schema completed for table name = " + tableName
+ " server = " + regionServer.getServerName().getServerName()
+ " online Regions = " + onlineRegions.size()
+ " refreshed Regions = " + refreshedRegionsCount;
LOG.debug(msg);
status.setStatus(msg);
} else {
LOG.debug("Server " + regionServer.getServerName().getServerName()
+ " has no online regions for table = " + tableName
+ " Ignoring the schema change request");
}
} catch (IOException ioe) {
reportAndLogSchemaRefreshError(tableName, onlineRegionsCount,
refreshedRegionsCount, ioe, status);
} catch (KeeperException ke) {
reportAndLogSchemaRefreshError(tableName, onlineRegionsCount,
refreshedRegionsCount, ke, status);
}
}
private int getZKNodeVersion(String nodePath) throws KeeperException {
return ZKUtil.checkExists(this.watcher, nodePath);
}
private void reportAndLogSchemaRefreshError(String tableName,
int onlineRegionsCount,
int refreshedRegionsCount,
Throwable exception,
MonitoredTask status) {
try {
String errmsg =
" Region Server " + regionServer.getServerName().getServerName()
+ " failed during schema change process. Cause = "
+ exception.getCause()
+ " Number of onlineRegions = " + onlineRegionsCount
+ " Processed regions = " + refreshedRegionsCount;
SchemaAlterStatus alterStatus = getSchemaAlterStatus(tableName);
alterStatus.update(SchemaAlterStatus.AlterState.FAILURE,
refreshedRegionsCount, errmsg);
String nodePath = getSchemaChangeNodePathForTableAndServer(tableName,
regionServer.getServerName().getServerName());
ZKUtil.updateExistingNodeData(this.watcher, nodePath,
Writables.getBytes(alterStatus), getZKNodeVersion(nodePath));
LOG.info("reportAndLogSchemaRefreshError() " +
" Updated child ZKNode with SchemaAlterStatus = "
+ alterStatus + " for table = " + tableName);
if (status == null) {
status = TaskMonitor.get().createStatus(errmsg);
} else {
status.setStatus(errmsg);
}
} catch (KeeperException e) {
// Retry ?
String errmsg = "KeeperException while updating the schema change node with "
+ "error status for table = "
+ tableName + " server = "
+ regionServer.getServerName().getServerName()
+ " Cause = " + e.getCause();
LOG.error(errmsg, e);
TaskMonitor.get().createStatus(errmsg);
} catch(IOException ioe) {
// retry ??
String errmsg = "IOException while updating the schema change node with "
+ "server name for table = "
+ tableName + " server = "
+ regionServer.getServerName().getServerName()
+ " Cause = " + ioe.getCause();
TaskMonitor.get().createStatus(errmsg);
LOG.error(errmsg, ioe);
}
}
private void createStateNode(String tableName, int numberOfOnlineRegions)
throws IOException {
SchemaAlterStatus sas =
new SchemaAlterStatus(regionServer.getServerName().getServerName(),
numberOfOnlineRegions);
LOG.debug("Creating Schema Alter State node = " + sas);
try {
ZKUtil.createSetData(this.watcher,
getSchemaChangeNodePathForTableAndServer(tableName,
regionServer.getServerName().getServerName()),
Writables.getBytes(sas));
} catch (KeeperException ke) {
String errmsg = "KeeperException while creating the schema change node with "
+ "server name for table = "
+ tableName + " server = "
+ regionServer.getServerName().getServerName()
+ " Message = " + ke.getCause();
LOG.error(errmsg, ke);
TaskMonitor.get().createStatus(errmsg);
}
}
private SchemaAlterStatus getSchemaAlterStatus(String tableName)
throws KeeperException, IOException {
byte[] statusBytes = ZKUtil.getData(this.watcher,
getSchemaChangeNodePathForTableAndServer(tableName,
regionServer.getServerName().getServerName()));
if (statusBytes == null || statusBytes.length <= 0) {
return null;
}
SchemaAlterStatus sas = new SchemaAlterStatus();
Writables.getWritable(statusBytes, sas);
return sas;
}
private void updateSchemaChangeStatus(String tableName,
SchemaAlterStatus schemaAlterStatus)
throws KeeperException, IOException {
try {
if(sleepTimeMillis > 0) {
try {
LOG.debug("SchemaChangeTracker sleeping for "
+ sleepTimeMillis);
Thread.sleep(sleepTimeMillis);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
ZKUtil.updateExistingNodeData(this.watcher,
getSchemaChangeNodePathForTableAndServer(tableName,
regionServer.getServerName().getServerName()),
Writables.getBytes(schemaAlterStatus), -1);
String msg = "Schema change tracker completed for table = " + tableName
+ " status = " + schemaAlterStatus;
LOG.debug(msg);
TaskMonitor.get().createStatus(msg);
} catch (KeeperException.NoNodeException e) {
String errmsg = "KeeperException.NoNodeException while updating the schema "
+ "change node with server name for table = "
+ tableName + " server = "
+ regionServer.getServerName().getServerName()
+ " Cause = " + e.getCause();
TaskMonitor.get().createStatus(errmsg);
LOG.error(errmsg, e);
} catch (KeeperException e) {
// Retry ?
String errmsg = "KeeperException while updating the schema change node with "
+ "server name for table = "
+ tableName + " server = "
+ regionServer.getServerName().getServerName()
+ " Cause = " + e.getCause();
LOG.error(errmsg, e);
TaskMonitor.get().createStatus(errmsg);
} catch(IOException ioe) {
String errmsg = "IOException while updating the schema change node with "
+ "server name for table = "
+ tableName + " server = "
+ regionServer.getServerName().getServerName()
+ " Cause = " + ioe.getCause();
LOG.error(errmsg, ioe);
TaskMonitor.get().createStatus(errmsg);
}
}
private String getSchemaChangeNodePathForTable(String tableName) {
return ZKUtil.joinZNode(watcher.schemaZNode, tableName);
}
private String getSchemaChangeNodePathForTableAndServer(
String tableName, String regionServerName) {
return ZKUtil.joinZNode(getSchemaChangeNodePathForTable(tableName),
regionServerName);
}
public int getSleepTimeMillis() {
return sleepTimeMillis;
}
/**
* Set a sleep time in millis before this RS can update it's progress status.
* Used only for test cases to test complex test scenarios such as RS failures and
* RS exemption handling.
* @param sleepTimeMillis
*/
public void setSleepTimeMillis(int sleepTimeMillis) {
this.sleepTimeMillis = sleepTimeMillis;
}
/**
* Check whether there are any schema change requests that are in progress now for the given table.
* We simply assume that a schema change is in progress if we see a ZK schema node this
* any table. We may revisit for fine grained checks such as check the current alter status
* et al, but it is not required now.
* @return
*/
public boolean isSchemaChangeInProgress(String tableName) {
try {
List<String> schemaChanges = ZKUtil.listChildrenAndWatchThem(this.watcher,
watcher.schemaZNode);
if (schemaChanges != null) {
for (String alterTableName : schemaChanges) {
if (alterTableName.equals(tableName)) {
return true;
}
}
return false;
}
} catch (KeeperException ke) {
LOG.debug("isSchemaChangeInProgress. " +
"KeeperException while getting current schema change progress.");
return false;
}
return false;
}
/**
* Holds the current alter state for a table. Alter state includes the
* current alter status (INPROCESS, FAILURE, SUCCESS, or IGNORED, current RS
* host name, timestamp of alter request, number of online regions this RS has for
* the given table, number of processed regions and an errorCause in case
* if the RS failed during the schema change process.
*
* RS keeps track of schema change requests per table using the alter status and
* periodically updates the alter status based on schema change status.
*/
public static class SchemaAlterStatus implements Writable {
public enum AlterState {
INPROCESS, // Inprocess alter
SUCCESS, // completed alter
FAILURE, // failure alter
IGNORED // Ignore the alter processing.
}
private AlterState currentAlterStatus;
// TimeStamp
private long stamp;
private int numberOfOnlineRegions;
private String errorCause = " ";
private String hostName;
private int numberOfRegionsProcessed = 0;
public SchemaAlterStatus() {
}
public SchemaAlterStatus(String hostName, int numberOfOnlineRegions) {
this.numberOfOnlineRegions = numberOfOnlineRegions;
this.stamp = System.currentTimeMillis();
this.currentAlterStatus = AlterState.INPROCESS;
//this.rsToProcess = activeHosts;
this.hostName = hostName;
}
public AlterState getCurrentAlterStatus() {
return currentAlterStatus;
}
public void setCurrentAlterStatus(AlterState currentAlterStatus) {
this.currentAlterStatus = currentAlterStatus;
}
public int getNumberOfOnlineRegions() {
return numberOfOnlineRegions;
}
public void setNumberOfOnlineRegions(int numberOfRegions) {
this.numberOfOnlineRegions = numberOfRegions;
}
public int getNumberOfRegionsProcessed() {
return numberOfRegionsProcessed;
}
public void setNumberOfRegionsProcessed(int numberOfRegionsProcessed) {
this.numberOfRegionsProcessed = numberOfRegionsProcessed;
}
public String getErrorCause() {
return errorCause;
}
public void setErrorCause(String errorCause) {
this.errorCause = errorCause;
}
public String getHostName() {
return hostName;
}
public void setHostName(String hostName) {
this.hostName = hostName;
}
public void update(AlterState state, int numberOfRegions, String errorCause) {
this.currentAlterStatus = state;
this.numberOfRegionsProcessed = numberOfRegions;
this.errorCause = errorCause;
}
public void update(AlterState state, int numberOfRegions) {
this.currentAlterStatus = state;
this.numberOfRegionsProcessed = numberOfRegions;
}
public void update(AlterState state) {
this.currentAlterStatus = state;
}
public void update(SchemaAlterStatus status) {
this.currentAlterStatus = status.getCurrentAlterStatus();
this.numberOfRegionsProcessed = status.getNumberOfRegionsProcessed();
this.errorCause = status.getErrorCause();
}
@Override
public void readFields(DataInput in) throws IOException {
currentAlterStatus = AlterState.valueOf(in.readUTF());
stamp = in.readLong();
numberOfOnlineRegions = in.readInt();
hostName = Bytes.toString(Bytes.readByteArray(in));
numberOfRegionsProcessed = in.readInt();
errorCause = Bytes.toString(Bytes.readByteArray(in));
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(currentAlterStatus.name());
out.writeLong(stamp);
out.writeInt(numberOfOnlineRegions);
Bytes.writeByteArray(out, Bytes.toBytes(hostName));
out.writeInt(numberOfRegionsProcessed);
Bytes.writeByteArray(out, Bytes.toBytes(errorCause));
}
@Override
public String toString() {
return
" state= " + currentAlterStatus
+ ", ts= " + stamp
+ ", number of online regions = " + numberOfOnlineRegions
+ ", host= " + hostName + " processed regions = " + numberOfRegionsProcessed
+ ", errorCause = " + errorCause;
}
}
}

View File

@ -102,8 +102,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
public String clusterIdZNode; public String clusterIdZNode;
// znode used for log splitting work assignment // znode used for log splitting work assignment
public String splitLogZNode; public String splitLogZNode;
// znode used to record table schema changes
public String schemaZNode;
// Certain ZooKeeper nodes need to be world-readable // Certain ZooKeeper nodes need to be world-readable
public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE = public static final ArrayList<ACL> CREATOR_ALL_AND_WORLD_READABLE =
@ -166,7 +164,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
ZKUtil.createAndFailSilent(this, drainingZNode); ZKUtil.createAndFailSilent(this, drainingZNode);
ZKUtil.createAndFailSilent(this, tableZNode); ZKUtil.createAndFailSilent(this, tableZNode);
ZKUtil.createAndFailSilent(this, splitLogZNode); ZKUtil.createAndFailSilent(this, splitLogZNode);
ZKUtil.createAndFailSilent(this, schemaZNode);
ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode); ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode);
} catch (KeeperException e) { } catch (KeeperException e) {
throw new ZooKeeperConnectionException( throw new ZooKeeperConnectionException(
@ -215,8 +212,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable {
conf.get("zookeeper.znode.clusterId", "hbaseid")); conf.get("zookeeper.znode.clusterId", "hbaseid"));
splitLogZNode = ZKUtil.joinZNode(baseZNode, splitLogZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME)); conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME));
schemaZNode = ZKUtil.joinZNode(baseZNode,
conf.get("zookeeper.znode.schema", "schema"));
} }
/** /**

View File

@ -790,30 +790,6 @@
simplify coprocessor failure analysis. simplify coprocessor failure analysis.
</description> </description>
</property> </property>
<property>
<name>hbase.instant.schema.alter.enabled</name>
<value>false</value>
<description>Whether or not to handle alter schema changes instantly or not.
If enabled, all schema change alter operations will be instant, as the master will not
explicitly unassign/assign the impacted regions and instead will rely on Region servers to
refresh their schema changes. If enabled, the schema alter requests will survive
master or RS failures.
</description>
</property>
<property>
<name>hbase.instant.schema.janitor.period</name>
<value>120000</value>
<description>The Schema Janitor process wakes up every millis and sweeps all
expired/failed schema change requests.
</description>
</property>
<property>
<name>hbase.instant.schema.alter.timeout</name>
<value>60000</value>
<description>Timeout in millis after which any pending schema alter request will be
considered as failed.
</description>
</property>
<property> <property>
<name>hbase.online.schema.update.enable</name> <name>hbase.online.schema.update.enable</name>
<value>false</value> <value>false</value>

View File

@ -1,169 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class InstantSchemaChangeTestBase {
final Log LOG = LogFactory.getLog(getClass());
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected HBaseAdmin admin;
protected static MiniHBaseCluster miniHBaseCluster = null;
protected Configuration conf;
protected static MasterSchemaChangeTracker msct = null;
protected final byte [] row = Bytes.toBytes("row");
protected final byte [] qualifier = Bytes.toBytes("qualifier");
final byte [] value = Bytes.toBytes("value");
@Before
public void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
TEST_UTIL.getConfiguration().setBoolean("hbase.instant.schema.alter.enabled", true);
TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.janitor.period", 10000);
TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.alter.timeout", 30000);
//
miniHBaseCluster = TEST_UTIL.startMiniCluster(2,5);
msct = TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
}
@After
public void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* Find the RS that is currently holding our online region.
* @param tableName
* @return
*/
protected HRegionServer findRSWithOnlineRegionFor(String tableName) {
List<JVMClusterUtil.RegionServerThread> rsThreads =
miniHBaseCluster.getLiveRegionServerThreads();
for (JVMClusterUtil.RegionServerThread rsT : rsThreads) {
HRegionServer rs = rsT.getRegionServer();
List<HRegion> regions = rs.getOnlineRegions(Bytes.toBytes(tableName));
if (regions != null && !regions.isEmpty()) {
return rs;
}
}
return null;
}
protected void waitForSchemaChangeProcess(final String tableName)
throws KeeperException, InterruptedException {
waitForSchemaChangeProcess(tableName, 10000);
}
/**
* This a pretty low cost signalling mechanism. It is quite possible that we will
* miss out the ZK node creation signal as in some cases the schema change process
* happens rather quickly and our thread waiting for ZK node creation might wait forver.
* The fool-proof strategy would be to directly listen for ZK events.
* @param tableName
* @throws KeeperException
* @throws InterruptedException
*/
protected void waitForSchemaChangeProcess(final String tableName, final long waitTimeMills)
throws KeeperException, InterruptedException {
LOG.info("Waiting for ZK node creation for table = " + tableName);
final MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
final Runnable r = new Runnable() {
public void run() {
try {
while(!msct.doesSchemaChangeNodeExists(tableName)) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (KeeperException ke) {
ke.printStackTrace();
}
LOG.info("Waiting for ZK node deletion for table = " + tableName);
try {
while(msct.doesSchemaChangeNodeExists(tableName)) {
try {
Thread.sleep(50);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (KeeperException ke) {
ke.printStackTrace();
}
}
};
Thread t = new Thread(r);
t.start();
if (waitTimeMills > 0) {
t.join(waitTimeMills);
} else {
t.join(10000);
}
}
protected HTable createTableAndValidate(String tableName) throws IOException {
conf = TEST_UTIL.getConfiguration();
LOG.info("Start createTableAndValidate()");
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
HTableDescriptor[] tables = admin.listTables();
int numTables = 0;
if (tables != null) {
numTables = tables.length;
}
HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName),
HConstants.CATALOG_FAMILY);
tables = this.admin.listTables();
assertEquals(numTables + 1, tables.length);
LOG.info("created table = " + tableName);
return ht;
}
}

View File

@ -1,473 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestInstantSchemaChange extends InstantSchemaChangeTestBase {
final Log LOG = LogFactory.getLog(getClass());
@Test
public void testInstantSchemaChangeForModifyTable() throws IOException,
KeeperException, InterruptedException {
String tableName = "testInstantSchemaChangeForModifyTable";
conf = TEST_UTIL.getConfiguration();
LOG.info("Start testInstantSchemaChangeForModifyTable()");
HTable ht = createTableAndValidate(tableName);
String newFamily = "newFamily";
HTableDescriptor htd = new HTableDescriptor(tableName);
htd.addFamily(new HColumnDescriptor(newFamily));
admin.modifyTable(Bytes.toBytes(tableName), htd);
waitForSchemaChangeProcess(tableName);
assertFalse(msct.doesSchemaChangeNodeExists(tableName));
Put put1 = new Put(row);
put1.add(Bytes.toBytes(newFamily), qualifier, value);
ht.put(put1);
Get get1 = new Get(row);
get1.addColumn(Bytes.toBytes(newFamily), qualifier);
Result r = ht.get(get1);
byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier);
int result = Bytes.compareTo(value, tvalue);
assertEquals(result, 0);
LOG.info("END testInstantSchemaChangeForModifyTable()");
ht.close();
}
@Test
public void testInstantSchemaChangeForAddColumn() throws IOException,
KeeperException, InterruptedException {
LOG.info("Start testInstantSchemaChangeForAddColumn() ");
String tableName = "testSchemachangeForAddColumn";
HTable ht = createTableAndValidate(tableName);
String newFamily = "newFamily";
HColumnDescriptor hcd = new HColumnDescriptor("newFamily");
admin.addColumn(Bytes.toBytes(tableName), hcd);
waitForSchemaChangeProcess(tableName);
assertFalse(msct.doesSchemaChangeNodeExists(tableName));
Put put1 = new Put(row);
put1.add(Bytes.toBytes(newFamily), qualifier, value);
LOG.info("******** Put into new column family ");
ht.put(put1);
Get get1 = new Get(row);
get1.addColumn(Bytes.toBytes(newFamily), qualifier);
Result r = ht.get(get1);
byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier);
LOG.info(" Value put = " + value + " value from table = " + tvalue);
int result = Bytes.compareTo(value, tvalue);
assertEquals(result, 0);
LOG.info("End testInstantSchemaChangeForAddColumn() ");
ht.close();
}
@Test
public void testInstantSchemaChangeForModifyColumn() throws IOException,
KeeperException, InterruptedException {
LOG.info("Start testInstantSchemaChangeForModifyColumn() ");
String tableName = "testSchemachangeForModifyColumn";
createTableAndValidate(tableName);
HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY);
hcd.setMaxVersions(99);
hcd.setBlockCacheEnabled(false);
admin.modifyColumn(Bytes.toBytes(tableName), hcd);
waitForSchemaChangeProcess(tableName);
assertFalse(msct.doesSchemaChangeNodeExists(tableName));
List<HRegion> onlineRegions
= miniHBaseCluster.getRegions(Bytes.toBytes("testSchemachangeForModifyColumn"));
for (HRegion onlineRegion : onlineRegions) {
HTableDescriptor htd = onlineRegion.getTableDesc();
HColumnDescriptor tableHcd = htd.getFamily(HConstants.CATALOG_FAMILY);
assertTrue(tableHcd.isBlockCacheEnabled() == false);
assertEquals(tableHcd.getMaxVersions(), 99);
}
LOG.info("End testInstantSchemaChangeForModifyColumn() ");
}
@Test
public void testInstantSchemaChangeForDeleteColumn() throws IOException,
KeeperException, InterruptedException {
LOG.info("Start testInstantSchemaChangeForDeleteColumn() ");
String tableName = "testSchemachangeForDeleteColumn";
int numTables = 0;
HTableDescriptor[] tables = admin.listTables();
if (tables != null) {
numTables = tables.length;
}
byte[][] FAMILIES = new byte[][] {
Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C") };
HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName),
FAMILIES);
tables = this.admin.listTables();
assertEquals(numTables + 1, tables.length);
LOG.info("Table testSchemachangeForDeleteColumn created");
admin.deleteColumn(tableName, "C");
waitForSchemaChangeProcess(tableName);
assertFalse(msct.doesSchemaChangeNodeExists(tableName));
HTableDescriptor modifiedHtd = this.admin.getTableDescriptor(Bytes.toBytes(tableName));
HColumnDescriptor hcd = modifiedHtd.getFamily(Bytes.toBytes("C"));
assertTrue(hcd == null);
LOG.info("End testInstantSchemaChangeForDeleteColumn() ");
ht.close();
}
@Test
public void testInstantSchemaChangeWhenTableIsNotEnabled() throws IOException,
KeeperException {
final String tableName = "testInstantSchemaChangeWhenTableIsDisabled";
conf = TEST_UTIL.getConfiguration();
LOG.info("Start testInstantSchemaChangeWhenTableIsDisabled()");
HTable ht = createTableAndValidate(tableName);
// Disable table
admin.disableTable("testInstantSchemaChangeWhenTableIsDisabled");
// perform schema changes
HColumnDescriptor hcd = new HColumnDescriptor("newFamily");
admin.addColumn(Bytes.toBytes(tableName), hcd);
MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
assertTrue(msct.doesSchemaChangeNodeExists(tableName) == false);
ht.close();
}
/**
* Test that when concurrent alter requests are received for a table we don't miss any.
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testConcurrentInstantSchemaChangeForAddColumn() throws IOException,
KeeperException, InterruptedException {
final String tableName = "testConcurrentInstantSchemaChangeForModifyTable";
conf = TEST_UTIL.getConfiguration();
LOG.info("Start testConcurrentInstantSchemaChangeForModifyTable()");
HTable ht = createTableAndValidate(tableName);
Runnable run1 = new Runnable() {
public void run() {
HColumnDescriptor hcd = new HColumnDescriptor("family1");
try {
admin.addColumn(Bytes.toBytes(tableName), hcd);
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
};
Runnable run2 = new Runnable() {
public void run() {
HColumnDescriptor hcd = new HColumnDescriptor("family2");
try {
admin.addColumn(Bytes.toBytes(tableName), hcd);
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
};
run1.run();
// We have to add a sleep here as in concurrent scenarios the HTD update
// in HDFS fails and returns with null HTD. This needs to be investigated,
// but it doesn't impact the instant alter functionality in any way.
Thread.sleep(100);
run2.run();
waitForSchemaChangeProcess(tableName);
Put put1 = new Put(row);
put1.add(Bytes.toBytes("family1"), qualifier, value);
ht.put(put1);
Get get1 = new Get(row);
get1.addColumn(Bytes.toBytes("family1"), qualifier);
Result r = ht.get(get1);
byte[] tvalue = r.getValue(Bytes.toBytes("family1"), qualifier);
int result = Bytes.compareTo(value, tvalue);
assertEquals(result, 0);
Thread.sleep(10000);
Put put2 = new Put(row);
put2.add(Bytes.toBytes("family2"), qualifier, value);
ht.put(put2);
Get get2 = new Get(row);
get2.addColumn(Bytes.toBytes("family2"), qualifier);
Result r2 = ht.get(get2);
byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
int result2 = Bytes.compareTo(value, tvalue2);
assertEquals(result2, 0);
LOG.info("END testConcurrentInstantSchemaChangeForModifyTable()");
ht.close();
}
/**
* The schema change request blocks while a LB run is in progress. This
* test validates this behavior.
* @throws IOException
* @throws InterruptedException
* @throws KeeperException
*/
@Test
public void testConcurrentInstantSchemaChangeAndLoadBalancerRun() throws IOException,
InterruptedException, KeeperException {
final String tableName = "testInstantSchemaChangeWithLoadBalancerRunning";
conf = TEST_UTIL.getConfiguration();
LOG.info("Start testInstantSchemaChangeWithLoadBalancerRunning()");
final String newFamily = "newFamily";
HTable ht = createTableAndValidate(tableName);
final MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
Runnable balancer = new Runnable() {
public void run() {
// run the balancer now.
miniHBaseCluster.getMaster().balance();
}
};
Runnable schemaChanger = new Runnable() {
public void run() {
HColumnDescriptor hcd = new HColumnDescriptor(newFamily);
try {
admin.addColumn(Bytes.toBytes(tableName), hcd);
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
};
balancer.run();
schemaChanger.run();
waitForSchemaChangeProcess(tableName, 40000);
assertFalse(msct.doesSchemaChangeNodeExists(tableName));
Put put1 = new Put(row);
put1.add(Bytes.toBytes(newFamily), qualifier, value);
LOG.info("******** Put into new column family ");
ht.put(put1);
ht.flushCommits();
LOG.info("******** Get from new column family ");
Get get1 = new Get(row);
get1.addColumn(Bytes.toBytes(newFamily), qualifier);
Result r = ht.get(get1);
byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier);
LOG.info(" Value put = " + value + " value from table = " + tvalue);
int result = Bytes.compareTo(value, tvalue);
assertEquals(result, 0);
LOG.info("End testInstantSchemaChangeWithLoadBalancerRunning() ");
ht.close();
}
/**
* This test validates two things. One is that the LoadBalancer does not run when a schema
* change process is in progress. The second thing is that it also checks that failed/expired
* schema changes are expired to unblock the load balancer run.
*
*/
@Test (timeout=70000)
public void testLoadBalancerBlocksDuringSchemaChangeRequests() throws KeeperException,
IOException, InterruptedException {
LOG.info("Start testConcurrentLoadBalancerSchemaChangeRequests() ");
final MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
// Test that the load balancer does not run while an in-flight schema
// change operation is in progress.
// Simulate a new schema change request.
msct.createSchemaChangeNode("testLoadBalancerBlocks", 0);
// The schema change node is created.
assertTrue(msct.doesSchemaChangeNodeExists("testLoadBalancerBlocks"));
// Now, request an explicit LB run.
Runnable balancer1 = new Runnable() {
public void run() {
// run the balancer now.
miniHBaseCluster.getMaster().balance();
}
};
balancer1.run();
// Load balancer should not run now.
assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false);
LOG.debug("testConcurrentLoadBalancerSchemaChangeRequests Asserted");
LOG.info("End testConcurrentLoadBalancerSchemaChangeRequests() ");
}
/**
* Test that instant schema change blocks while LB is running.
* @throws KeeperException
* @throws IOException
* @throws InterruptedException
*/
@Test (timeout=10000)
public void testInstantSchemaChangeBlocksDuringLoadBalancerRun() throws KeeperException,
IOException, InterruptedException {
final MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
final String tableName = "testInstantSchemaChangeBlocksDuringLoadBalancerRun";
conf = TEST_UTIL.getConfiguration();
LOG.info("Start testInstantSchemaChangeBlocksDuringLoadBalancerRun()");
final String newFamily = "newFamily";
createTableAndValidate(tableName);
// Test that the schema change request does not run while an in-flight LB run
// is in progress.
// First, request an explicit LB run.
Runnable balancer1 = new Runnable() {
public void run() {
// run the balancer now.
miniHBaseCluster.getMaster().balance();
}
};
Runnable schemaChanger = new Runnable() {
public void run() {
HColumnDescriptor hcd = new HColumnDescriptor(newFamily);
try {
admin.addColumn(Bytes.toBytes(tableName), hcd);
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
};
Thread t1 = new Thread(balancer1);
Thread t2 = new Thread(schemaChanger);
t1.start();
t2.start();
// check that they both happen concurrently
Runnable balancerCheck = new Runnable() {
public void run() {
// check whether balancer is running.
while(!miniHBaseCluster.getMaster().isLoadBalancerRunning()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
try {
assertFalse(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks"));
} catch (KeeperException ke) {
ke.printStackTrace();
}
LOG.debug("Load Balancer is now running or skipped");
while(miniHBaseCluster.getMaster().isLoadBalancerRunning()) {
try {
Thread.sleep(10);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false);
try {
assertTrue(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks"));
} catch (KeeperException ke) {
}
}
};
Thread t = new Thread(balancerCheck);
t.start();
t.join(1000);
// Load balancer should not run now.
//assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false);
// Schema change request node should now exist.
// assertTrue(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks"));
LOG.debug("testInstantSchemaChangeBlocksDuringLoadBalancerRun Asserted");
LOG.info("End testInstantSchemaChangeBlocksDuringLoadBalancerRun() ");
}
/**
* To test the schema janitor (that it cleans expired/failed schema alter attempts) we
* simply create a fake table (that doesn't exist, with fake number of online regions) in ZK.
* This schema alter request will time out (after 30 seconds) and our janitor will clean it up.
* regions
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testInstantSchemaJanitor() throws IOException,
KeeperException, InterruptedException {
LOG.info("testInstantSchemaWithFailedExpiredOperations() ");
String fakeTableName = "testInstantSchemaWithFailedExpiredOperations";
MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
msct.createSchemaChangeNode(fakeTableName, 10);
LOG.debug(msct.getSchemaChangeNodePathForTable(fakeTableName)
+ " created");
Thread.sleep(40000);
assertFalse(msct.doesSchemaChangeNodeExists(fakeTableName));
LOG.debug(msct.getSchemaChangeNodePathForTable(fakeTableName)
+ " deleted");
LOG.info("END testInstantSchemaWithFailedExpiredOperations() ");
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}

View File

@ -1,313 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.hadoop.hbase.zookeeper.ZKUtil;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.apache.zookeeper.KeeperException;
import org.junit.After;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestInstantSchemaChangeFailover {
final Log LOG = LogFactory.getLog(getClass());
private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
private HBaseAdmin admin;
private static MiniHBaseCluster miniHBaseCluster = null;
private Configuration conf;
private ZooKeeperWatcher zkw;
private static MasterSchemaChangeTracker msct = null;
private final byte [] row = Bytes.toBytes("row");
private final byte [] qualifier = Bytes.toBytes("qualifier");
final byte [] value = Bytes.toBytes("value");
@Before
public void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100);
TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250);
TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6);
TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true);
TEST_UTIL.getConfiguration().setBoolean("hbase.instant.schema.alter.enabled", true);
TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.janitor.period", 10000);
TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.alter.timeout", 30000);
//
miniHBaseCluster = TEST_UTIL.startMiniCluster(2,5);
msct = TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration());
}
@After
public void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}
/**
* This a pretty low cost signalling mechanism. It is quite possible that we will
* miss out the ZK node creation signal as in some cases the schema change process
* happens rather quickly and our thread waiting for ZK node creation might wait forver.
* The fool-proof strategy would be to directly listen for ZK events.
* @param tableName
* @throws KeeperException
* @throws InterruptedException
*/
private void waitForSchemaChangeProcess(final String tableName)
throws KeeperException, InterruptedException {
LOG.info("Waiting for ZK node creation for table = " + tableName);
final MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
final Runnable r = new Runnable() {
public void run() {
try {
while(!msct.doesSchemaChangeNodeExists(tableName)) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (KeeperException ke) {
ke.printStackTrace();
}
LOG.info("Waiting for ZK node deletion for table = " + tableName);
try {
while(msct.doesSchemaChangeNodeExists(tableName)) {
try {
Thread.sleep(20);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
} catch (KeeperException ke) {
ke.printStackTrace();
}
}
};
Thread t = new Thread(r);
t.start();
t.join(10000);
}
/**
* Kill a random RS and see that the schema change can succeed.
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test (timeout=50000)
public void testInstantSchemaChangeWhileRSCrash() throws IOException,
KeeperException, InterruptedException {
LOG.info("Start testInstantSchemaChangeWhileRSCrash()");
zkw = miniHBaseCluster.getMaster().getZooKeeperWatcher();
final String tableName = "TestRSCrashDuringSchemaChange";
HTable ht = createTableAndValidate(tableName);
HColumnDescriptor hcd = new HColumnDescriptor("family2");
admin.addColumn(Bytes.toBytes(tableName), hcd);
miniHBaseCluster.getRegionServer(0).abort("Killing while instant schema change");
// Let the dust settle down
Thread.sleep(10000);
waitForSchemaChangeProcess(tableName);
Put put2 = new Put(row);
put2.add(Bytes.toBytes("family2"), qualifier, value);
ht.put(put2);
Get get2 = new Get(row);
get2.addColumn(Bytes.toBytes("family2"), qualifier);
Result r2 = ht.get(get2);
byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
int result2 = Bytes.compareTo(value, tvalue2);
assertEquals(result2, 0);
String nodePath = msct.getSchemaChangeNodePathForTable("TestRSCrashDuringSchemaChange");
assertTrue(ZKUtil.checkExists(zkw, nodePath) == -1);
LOG.info("result2 = " + result2);
LOG.info("end testInstantSchemaChangeWhileRSCrash()");
ht.close();
}
/**
* Randomly bring down/up RS servers while schema change is in progress. This test
* is same as the above one but the only difference is that we intent to kill and start
* new RS instances while a schema change is in progress.
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test (timeout=70000)
public void testInstantSchemaChangeWhileRandomRSCrashAndStart() throws IOException,
KeeperException, InterruptedException {
LOG.info("Start testInstantSchemaChangeWhileRandomRSCrashAndStart()");
miniHBaseCluster.getRegionServer(4).abort("Killing RS 4");
// Start a new RS before schema change .
// Commenting the start RS as it is failing with DFS user permission NPE.
//miniHBaseCluster.startRegionServer();
// Let the dust settle
Thread.sleep(10000);
final String tableName = "testInstantSchemaChangeWhileRandomRSCrashAndStart";
HTable ht = createTableAndValidate(tableName);
HColumnDescriptor hcd = new HColumnDescriptor("family2");
admin.addColumn(Bytes.toBytes(tableName), hcd);
// Kill 2 RS now.
miniHBaseCluster.getRegionServer(2).abort("Killing RS 2");
// Let the dust settle
Thread.sleep(10000);
// We will be left with only one RS.
waitForSchemaChangeProcess(tableName);
assertFalse(msct.doesSchemaChangeNodeExists(tableName));
Put put2 = new Put(row);
put2.add(Bytes.toBytes("family2"), qualifier, value);
ht.put(put2);
Get get2 = new Get(row);
get2.addColumn(Bytes.toBytes("family2"), qualifier);
Result r2 = ht.get(get2);
byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
int result2 = Bytes.compareTo(value, tvalue2);
assertEquals(result2, 0);
LOG.info("result2 = " + result2);
LOG.info("end testInstantSchemaChangeWhileRandomRSCrashAndStart()");
ht.close();
}
/**
* Test scenario where primary master is brought down while processing an
* alter request. This is harder one as it is very difficult the time this.
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test (timeout=50000)
public void testInstantSchemaChangeWhileMasterFailover() throws IOException,
KeeperException, InterruptedException {
LOG.info("Start testInstantSchemaChangeWhileMasterFailover()");
//Thread.sleep(5000);
final String tableName = "testInstantSchemaChangeWhileMasterFailover";
HTable ht = createTableAndValidate(tableName);
HColumnDescriptor hcd = new HColumnDescriptor("family2");
admin.addColumn(Bytes.toBytes(tableName), hcd);
// Kill primary master now.
Thread.sleep(50);
miniHBaseCluster.getMaster().abort("Aborting master now", new Exception("Schema exception"));
// It may not be possible for us to check the schema change status
// using waitForSchemaChangeProcess as our ZK session in MasterSchemachangeTracker will be
// lost when master dies and hence may not be accurate. So relying on old-fashioned
// sleep here.
Thread.sleep(25000);
Put put2 = new Put(row);
put2.add(Bytes.toBytes("family2"), qualifier, value);
ht.put(put2);
Get get2 = new Get(row);
get2.addColumn(Bytes.toBytes("family2"), qualifier);
Result r2 = ht.get(get2);
byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier);
int result2 = Bytes.compareTo(value, tvalue2);
assertEquals(result2, 0);
LOG.info("result2 = " + result2);
LOG.info("end testInstantSchemaChangeWhileMasterFailover()");
ht.close();
}
/**
* TEst the master fail over during a schema change request in ZK.
* We create a fake schema change request in ZK and abort the primary master
* mid-flight to simulate a master fail over scenario during a mid-flight
* schema change process. The new master's schema janitor will eventually
* cleanup this fake request after time out.
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Ignore
@Test
public void testInstantSchemaOperationsInZKForMasterFailover() throws IOException,
KeeperException, InterruptedException {
LOG.info("testInstantSchemaOperationsInZKForMasterFailover() ");
String tableName = "testInstantSchemaOperationsInZKForMasterFailover";
conf = TEST_UTIL.getConfiguration();
MasterSchemaChangeTracker activesct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
activesct.createSchemaChangeNode(tableName, 10);
LOG.debug(activesct.getSchemaChangeNodePathForTable(tableName)
+ " created");
assertTrue(activesct.doesSchemaChangeNodeExists(tableName));
// Kill primary master now.
miniHBaseCluster.getMaster().abort("Aborting master now", new Exception("Schema exception"));
// wait for 50 secs. This is so that our schema janitor from fail-over master will kick-in and
// cleanup this failed/expired schema change request.
Thread.sleep(50000);
MasterSchemaChangeTracker newmsct = miniHBaseCluster.getMaster().getSchemaChangeTracker();
assertFalse(newmsct.doesSchemaChangeNodeExists(tableName));
LOG.debug(newmsct.getSchemaChangeNodePathForTable(tableName)
+ " deleted");
LOG.info("END testInstantSchemaOperationsInZKForMasterFailover() ");
}
private HTable createTableAndValidate(String tableName) throws IOException {
conf = TEST_UTIL.getConfiguration();
LOG.info("Start createTableAndValidate()");
HTableDescriptor[] tables = admin.listTables();
int numTables = 0;
if (tables != null) {
numTables = tables.length;
}
HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName),
HConstants.CATALOG_FAMILY);
tables = this.admin.listTables();
assertEquals(numTables + 1, tables.length);
LOG.info("created table = " + tableName);
return ht;
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}

View File

@ -1,224 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LargeTests;
import org.apache.hadoop.hbase.io.hfile.Compression;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.zookeeper.KeeperException;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestInstantSchemaChangeSplit extends InstantSchemaChangeTestBase {
final Log LOG = LogFactory.getLog(getClass());
/**
* The objective of the following test is to validate that schema exclusions happen properly.
* When a RS server dies or crashes(?) mid-flight during a schema refresh, we would exclude
* all online regions in that RS, as well as the RS itself from schema change process.
*
* @throws IOException
* @throws KeeperException
* @throws InterruptedException
*/
@Test
public void testInstantSchemaChangeExclusions() throws IOException,
KeeperException, InterruptedException {
MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
LOG.info("Start testInstantSchemaChangeExclusions() ");
String tableName = "testInstantSchemaChangeExclusions";
HTable ht = createTableAndValidate(tableName);
HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY);
hcd.setMaxVersions(99);
hcd.setBlockCacheEnabled(false);
HRegionServer hrs = findRSWithOnlineRegionFor(tableName);
//miniHBaseCluster.getRegionServer(0).abort("killed for test");
admin.modifyColumn(Bytes.toBytes(tableName), hcd);
hrs.abort("Aborting for tests");
hrs.getSchemaChangeTracker().setSleepTimeMillis(20000);
//admin.modifyColumn(Bytes.toBytes(tableName), hcd);
LOG.debug("Waiting for Schema Change process to complete");
waitForSchemaChangeProcess(tableName, 15000);
assertEquals(msct.doesSchemaChangeNodeExists(tableName), false);
// Sleep for some time so that our region is reassigned to some other RS
// by master.
Thread.sleep(10000);
List<HRegion> onlineRegions
= miniHBaseCluster.getRegions(Bytes.toBytes("testInstantSchemaChangeExclusions"));
assertTrue(!onlineRegions.isEmpty());
for (HRegion onlineRegion : onlineRegions) {
HTableDescriptor htd = onlineRegion.getTableDesc();
HColumnDescriptor tableHcd = htd.getFamily(HConstants.CATALOG_FAMILY);
assertTrue(tableHcd.isBlockCacheEnabled() == false);
assertEquals(tableHcd.getMaxVersions(), 99);
}
LOG.info("End testInstantSchemaChangeExclusions() ");
ht.close();
}
/**
* This test validates that when a schema change request fails on the
* RS side, we appropriately register the failure in the Master Schema change
* tracker's node as well as capture the error cause.
*
* Currently an alter request fails if RS fails with an IO exception say due to
* missing or incorrect codec. With instant schema change the same failure happens
* and we register the failure with associated cause and also update the
* monitor status appropriately.
*
* The region(s) will be orphaned in both the cases.
*
*/
@Test
public void testInstantSchemaChangeWhileRSOpenRegionFailure() throws IOException,
KeeperException, InterruptedException {
MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
LOG.info("Start testInstantSchemaChangeWhileRSOpenRegionFailure() ");
String tableName = "testInstantSchemaChangeWhileRSOpenRegionFailure";
HTable ht = createTableAndValidate(tableName);
// create now 100 regions
TEST_UTIL.createMultiRegions(conf, ht,
HConstants.CATALOG_FAMILY, 10);
// wait for all the regions to be assigned
Thread.sleep(10000);
List<HRegion> onlineRegions
= miniHBaseCluster.getRegions(
Bytes.toBytes("testInstantSchemaChangeWhileRSOpenRegionFailure"));
int size = onlineRegions.size();
// we will not have any online regions
LOG.info("Size of online regions = " + onlineRegions.size());
HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY);
hcd.setMaxVersions(99);
hcd.setBlockCacheEnabled(false);
hcd.setCompressionType(Compression.Algorithm.SNAPPY);
admin.modifyColumn(Bytes.toBytes(tableName), hcd);
Thread.sleep(100);
assertEquals(msct.doesSchemaChangeNodeExists(tableName), true);
Thread.sleep(10000);
// get the current alter status and validate that its failure with appropriate error msg.
MasterSchemaChangeTracker.MasterAlterStatus mas = msct.getMasterAlterStatus(tableName);
assertTrue(mas != null);
assertEquals(mas.getCurrentAlterStatus(),
MasterSchemaChangeTracker.MasterAlterStatus.AlterState.FAILURE);
assertTrue(mas.getErrorCause() != null);
LOG.info("End testInstantSchemaChangeWhileRSOpenRegionFailure() ");
ht.close();
}
@Test
public void testConcurrentInstantSchemaChangeAndSplit() throws IOException,
InterruptedException, KeeperException {
final String tableName = "testConcurrentInstantSchemaChangeAndSplit";
conf = TEST_UTIL.getConfiguration();
LOG.info("Start testConcurrentInstantSchemaChangeAndSplit()");
final String newFamily = "newFamily";
HTable ht = createTableAndValidate(tableName);
final MasterSchemaChangeTracker msct =
TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker();
// create now 10 regions
TEST_UTIL.createMultiRegions(conf, ht,
HConstants.CATALOG_FAMILY, 4);
int rowCount = TEST_UTIL.loadTable(ht, HConstants.CATALOG_FAMILY);
//assertRowCount(t, rowCount);
Runnable splitter = new Runnable() {
public void run() {
// run the splits now.
try {
LOG.info("Splitting table now ");
admin.split(Bytes.toBytes(tableName));
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
Runnable schemaChanger = new Runnable() {
public void run() {
HColumnDescriptor hcd = new HColumnDescriptor(newFamily);
try {
admin.addColumn(Bytes.toBytes(tableName), hcd);
} catch (IOException ioe) {
ioe.printStackTrace();
}
}
};
schemaChanger.run();
Thread.sleep(50);
splitter.run();
waitForSchemaChangeProcess(tableName, 40000);
Put put1 = new Put(row);
put1.add(Bytes.toBytes(newFamily), qualifier, value);
LOG.info("******** Put into new column family ");
ht.put(put1);
ht.flushCommits();
LOG.info("******** Get from new column family ");
Get get1 = new Get(row);
get1.addColumn(Bytes.toBytes(newFamily), qualifier);
Result r = ht.get(get1);
byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier);
LOG.info(" Value put = " + value + " value from table = " + tvalue);
int result = Bytes.compareTo(value, tvalue);
assertEquals(result, 0);
LOG.info("End testConcurrentInstantSchemaChangeAndSplit() ");
ht.close();
}
@org.junit.Rule
public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu =
new org.apache.hadoop.hbase.ResourceCheckerJUnitRule();
}

View File

@ -360,6 +360,12 @@ class MockRegionServer implements HRegionInterface, RegionServerServices {
return null; return null;
} }
@Override
public List<HRegion> getOnlineRegions(byte[] tableName) throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override @Override
public HServerInfo getHServerInfo() throws IOException { public HServerInfo getHServerInfo() throws IOException {
// TODO Auto-generated method stub // TODO Auto-generated method stub
@ -513,17 +519,6 @@ class MockRegionServer implements HRegionInterface, RegionServerServices {
return null; return null;
} }
@Override
public List<HRegion> getOnlineRegions(byte[] tableName) throws IOException {
// TODO Auto-generated method stub
return null;
}
@Override
public void refreshRegion(HRegion hRegion) throws IOException {
// TODO Auto-generated method stub
}
@Override @Override
public Configuration getConfiguration() { public Configuration getConfiguration() {
return this.conf; return this.conf;

View File

@ -51,15 +51,12 @@ import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.HConnectionTestingUtility;
import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.executor.EventHandler;
import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService;
import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.io.Reference;
import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.ipc.HRegionInterface;
import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.Store;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Writables; import org.apache.hadoop.hbase.util.Writables;
import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker;
import org.apache.hadoop.hbase.zookeeper.RegionServerTracker;
import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
@ -155,6 +152,11 @@ public class TestCatalogJanitor {
this.asm = Mockito.mock(AssignmentManager.class); this.asm = Mockito.mock(AssignmentManager.class);
} }
@Override
public void checkTableModifiable(byte[] tableName) throws IOException {
//no-op
}
@Override @Override
public void createTable(HTableDescriptor desc, byte[][] splitKeys) public void createTable(HTableDescriptor desc, byte[][] splitKeys)
throws IOException { throws IOException {
@ -171,11 +173,6 @@ public class TestCatalogJanitor {
return null; return null;
} }
public void checkTableModifiable(byte[] tableName,
EventHandler.EventType eventType)
throws IOException {
}
@Override @Override
public MasterFileSystem getMasterFileSystem() { public MasterFileSystem getMasterFileSystem() {
return this.mfs; return this.mfs;
@ -263,14 +260,6 @@ public class TestCatalogJanitor {
}; };
} }
public MasterSchemaChangeTracker getSchemaChangeTracker() {
return null;
}
public RegionServerTracker getRegionServerTracker() {
return null;
}
@Override @Override
public boolean isServerShutdownHandlerEnabled() { public boolean isServerShutdownHandlerEnabled() {
return true; return true;

View File

@ -63,9 +63,6 @@ public class MockRegionServerServices implements RegionServerServices {
return null; return null;
} }
public void refreshRegion(HRegion hRegion) throws IOException {
}
@Override @Override
public void addToOnlineRegions(HRegion r) { public void addToOnlineRegions(HRegion r) {
this.regions.put(r.getRegionInfo().getEncodedName(), r); this.regions.put(r.getRegionInfo().getEncodedName(), r);