diff --git a/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java b/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java index 9c9c7cc4415..7defac0d046 100644 --- a/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java +++ b/src/main/java/org/apache/hadoop/hbase/LocalHBaseCluster.java @@ -309,10 +309,12 @@ public class LocalHBaseCluster { */ public HMaster getActiveMaster() { for (JVMClusterUtil.MasterThread mt : masterThreads) { - // Ensure that the current active master is not stopped. - // We don't want to return a stopping master as an active master. - if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) { - return mt.getMaster(); + if (mt.getMaster().isActiveMaster()) { + // Ensure that the current active master is not stopped. + // We don't want to return a stopping master as an active master. + if (mt.getMaster().isActiveMaster() && !mt.getMaster().isStopped()) { + return mt.getMaster(); + } } } return null; diff --git a/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java b/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java index 62298587bd5..4121508b1e5 100644 --- a/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/executor/EventHandler.java @@ -144,12 +144,13 @@ public abstract class EventHandler implements Runnable, Comparable { * Constructor */ EventType(int value) {} - public boolean isSchemaChangeEvent() { + public boolean isOnlineSchemaChangeSupported() { return ( - this.equals(EventType.C_M_ADD_FAMILY) || - this.equals(EventType.C_M_DELETE_FAMILY) || - this.equals(EventType.C_M_MODIFY_FAMILY) || - this.equals(EventType.C_M_MODIFY_TABLE)); + this.equals(EventType.C_M_ADD_FAMILY) || + this.equals(EventType.C_M_DELETE_FAMILY) || + this.equals(EventType.C_M_MODIFY_FAMILY) || + this.equals(EventType.C_M_MODIFY_TABLE) + ); } } diff --git a/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java b/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java index ea7ae45924d..ce815474d51 100644 --- a/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java +++ b/src/main/java/org/apache/hadoop/hbase/ipc/HMasterInterface.java @@ -266,12 +266,4 @@ public interface HMasterInterface extends VersionedProtocol { * @return array of HTableDescriptor */ public HTableDescriptor[] getHTableDescriptors(List tableNames); - - /** - * Returns the current running status of load balancer. - * @return True if LoadBalancer is running now else False. - */ - public boolean isLoadBalancerRunning(); - - } diff --git a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java index 26e9552845c..c0c320446bc 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/AssignmentManager.java @@ -304,7 +304,7 @@ public class AssignmentManager extends ZooKeeperListener { List hris = MetaReader.getTableRegions(this.master.getCatalogTracker(), tableName); Integer pending = 0; - for(HRegionInfo hri : hris) { + for (HRegionInfo hri : hris) { String name = hri.getEncodedName(); if (regionsToReopen.containsKey(name) || regionsInTransition.containsKey(name)) { pending++; diff --git a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java index daf3b0703ef..a3b4f6f19e6 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/HMaster.java +++ b/src/main/java/org/apache/hadoop/hbase/master/HMaster.java @@ -71,7 +71,6 @@ import org.apache.hadoop.hbase.client.MetaScanner; import org.apache.hadoop.hbase.client.MetaScanner.MetaScannerVisitor; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.coprocessor.CoprocessorHost; -import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.executor.ExecutorService.ExecutorType; import org.apache.hadoop.hbase.ipc.HBaseRPC; @@ -91,6 +90,7 @@ import org.apache.hadoop.hbase.master.handler.TableDeleteFamilyHandler; import org.apache.hadoop.hbase.master.handler.TableEventHandler; import org.apache.hadoop.hbase.master.handler.TableModifyFamilyHandler; import org.apache.hadoop.hbase.master.metrics.MasterMetrics; +import org.apache.hadoop.hbase.master.RegionPlan; import org.apache.hadoop.hbase.monitoring.MemoryBoundedLogMessageBuffer; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -107,7 +107,6 @@ import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ClusterId; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.DrainingServerTracker; -import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -184,10 +183,7 @@ Server { private CatalogTracker catalogTracker; // Cluster status zk tracker and local setter private ClusterStatusTracker clusterStatusTracker; - - // Schema change tracker - private MasterSchemaChangeTracker schemaChangeTracker; - + // buffer for "fatal error" notices from region servers // in the cluster. This is only used for assisting // operations/debugging. @@ -215,18 +211,12 @@ Server { private CatalogJanitor catalogJanitorChore; private LogCleaner logCleaner; - private Thread schemaJanitorChore; private MasterCoprocessorHost cpHost; private final ServerName serverName; private TableDescriptors tableDescriptors; - // Whether or not schema alter changes go through ZK or not. - private boolean supportInstantSchemaChanges = false; - - private volatile boolean loadBalancerRunning = false; - // Time stamps for when a hmaster was started and when it became active private long masterStartTime; private long masterActiveTime; @@ -300,18 +290,6 @@ Server { this.zooKeeper = new ZooKeeperWatcher(conf, MASTER + ":" + isa.getPort(), this, true); this.rpcServer.startThreads(); this.metrics = new MasterMetrics(getServerName().toString()); - this.supportInstantSchemaChanges = getSupportInstantSchemaChanges(conf); - } - - /** - * Get whether instant schema change is on or not. - * @param c - * @return True if instant schema enabled. - */ - private boolean getSupportInstantSchemaChanges(final Configuration c) { - boolean b = c.getBoolean("hbase.instant.schema.alter.enabled", false); - LOG.debug("Instant schema change enabled=" + b + "."); - return b; } /** @@ -451,12 +429,6 @@ Server { boolean wasUp = this.clusterStatusTracker.isClusterUp(); if (!wasUp) this.clusterStatusTracker.setClusterUp(); - // initialize schema change tracker - this.schemaChangeTracker = new MasterSchemaChangeTracker(getZooKeeper(), - this, this, - conf.getInt("hbase.instant.schema.alter.timeout", 60000)); - this.schemaChangeTracker.start(); - LOG.info("Server active/primary master; " + this.serverName + ", sessionid=0x" + Long.toHexString(this.zooKeeper.getRecoverableZooKeeper().getSessionId()) + @@ -596,9 +568,6 @@ Server { this.catalogJanitorChore = new CatalogJanitor(this, this); Threads.setDaemonThreadRunning(catalogJanitorChore.getThread()); - // Schema janitor chore. - this.schemaJanitorChore = getAndStartSchemaJanitorChore(this); - registerMBean(); status.markComplete("Initialization successful"); @@ -811,15 +780,6 @@ Server { return this.tableDescriptors; } - @Override - public MasterSchemaChangeTracker getSchemaChangeTracker() { - return this.schemaChangeTracker; - } - - public RegionServerTracker getRegionServerTracker() { - return this.regionServerTracker; - } - /** @return InfoServer object. Maybe null.*/ public InfoServer getInfoServer() { return this.infoServer; @@ -931,28 +891,7 @@ Server { if (this.executorService != null) this.executorService.shutdown(); } - /** - * Start the schema janitor. This Janitor will periodically sweep the failed/expired schema - * changes. - * @param master - * @return - */ - private Thread getAndStartSchemaJanitorChore(final HMaster master) { - String name = master.getServerName() + "-SchemaJanitorChore"; - int schemaJanitorPeriod = - master.getConfiguration().getInt("hbase.instant.schema.janitor.period", 120000); - // Start up the schema janitor chore - Chore chore = new Chore(name, schemaJanitorPeriod, master) { - @Override - protected void chore() { - master.getSchemaChangeTracker().handleFailedOrExpiredSchemaChanges(); - } - }; - return Threads.setDaemonThreadRunning(chore.getThread()); - } - - - private Thread getAndStartBalancerChore(final HMaster master) { + private static Thread getAndStartBalancerChore(final HMaster master) { String name = master.getServerName() + "-BalancerChore"; int balancerPeriod = master.getConfiguration().getInt("hbase.balancer.period", 300000); @@ -973,10 +912,6 @@ Server { if (this.catalogJanitorChore != null) { this.catalogJanitorChore.interrupt(); } - if (this.schemaJanitorChore != null) { - this.schemaJanitorChore.interrupt(); - } - } @Override @@ -1058,15 +993,6 @@ Server { return balancerCutoffTime; } - - /** - * Check whether the Load Balancer is currently running. - * @return true if the Load balancer is currently running. - */ - public boolean isLoadBalancerRunning() { - return loadBalancerRunning; - } - @Override public boolean balance() { // If balance not true, don't run balancer. @@ -1074,33 +1000,23 @@ Server { // Do this call outside of synchronized block. int maximumBalanceTime = getBalancerCutoffTime(); long cutoffTime = System.currentTimeMillis() + maximumBalanceTime; - boolean balancerRan = false; + boolean balancerRan; synchronized (this.balancer) { - if (loadBalancerRunning) { - LOG.debug("Load balancer is currently running. Skipping the current execution."); - return false; - } - // Only allow one balance run at at time. if (this.assignmentManager.isRegionsInTransition()) { LOG.debug("Not running balancer because " + - this.assignmentManager.getRegionsInTransition().size() + - " region(s) in transition: " + - org.apache.commons.lang.StringUtils. + this.assignmentManager.getRegionsInTransition().size() + + " region(s) in transition: " + + org.apache.commons.lang.StringUtils. abbreviate(this.assignmentManager.getRegionsInTransition().toString(), 256)); return false; } if (this.serverManager.areDeadServersInProgress()) { LOG.debug("Not running balancer because processing dead regionserver(s): " + - this.serverManager.getDeadServers()); + this.serverManager.getDeadServers()); return false; } - if (schemaChangeTracker.isSchemaChangeInProgress()) { - LOG.debug("Schema change operation is in progress. Waiting for " + - "it to complete before running the load balancer."); - return false; - } - loadBalancerRunning = true; + if (this.cpHost != null) { try { if (this.cpHost.preBalance()) { @@ -1135,7 +1051,7 @@ Server { // if performing next balance exceeds cutoff time, exit the loop (System.currentTimeMillis() + (totalRegPlanExecTime / rpCount)) > cutoffTime) { LOG.debug("No more balancing till next balance run; maximumBalanceTime=" + - maximumBalanceTime); + maximumBalanceTime); break; } } @@ -1148,7 +1064,6 @@ Server { LOG.error("Error invoking master coprocessor postBalance()", ioe); } } - loadBalancerRunning = false; } return balancerRan; } @@ -1298,9 +1213,7 @@ Server { if (cpHost != null) { cpHost.preDeleteTable(tableName); } - this.executorService.submit(new DeleteTableHandler(tableName, this, this, this, - supportInstantSchemaChanges)); - + this.executorService.submit(new DeleteTableHandler(tableName, this, this)); if (cpHost != null) { cpHost.postDeleteTable(tableName); } @@ -1312,6 +1225,7 @@ Server { * @return Pair indicating the number of regions updated Pair.getFirst is the * regions that are yet to be updated Pair.getSecond is the total number * of regions of the table + * @throws IOException */ public Pair getAlterStatus(byte[] tableName) throws IOException { @@ -1319,44 +1233,9 @@ Server { // may overlap with other table operations or the table operation may // have completed before querying this API. We need to refactor to a // transaction system in the future to avoid these ambiguities. - if (supportInstantSchemaChanges) { - return getAlterStatusFromSchemaChangeTracker(tableName); - } return this.assignmentManager.getReopenStatus(tableName); } - /** - * Used by the client to identify if all regions have the schema updates - * - * @param tableName - * @return Pair indicating the status of the alter command - * @throws IOException - */ - private Pair getAlterStatusFromSchemaChangeTracker(byte[] tableName) - throws IOException { - MasterSchemaChangeTracker.MasterAlterStatus alterStatus = null; - try { - alterStatus = - this.schemaChangeTracker.getMasterAlterStatus(Bytes.toString(tableName)); - } catch (KeeperException ke) { - LOG.error("KeeperException while getting schema alter status for table = " - + Bytes.toString(tableName), ke); - } - if (alterStatus != null) { - LOG.debug("Getting AlterStatus from SchemaChangeTracker for table = " - + Bytes.toString(tableName) + " Alter Status = " - + alterStatus.toString()); - return new Pair(alterStatus.getNumberOfRegionsProcessed(), - alterStatus.getNumberOfRegionsToProcess()); - } else { - LOG.debug("MasterAlterStatus is NULL for table = " - + Bytes.toString(tableName)); - // should we throw IOException here as it makes more sense? - return new Pair(0,0); - } - } - - public void addColumn(byte [] tableName, HColumnDescriptor column) throws IOException { checkInitialized(); @@ -1365,8 +1244,7 @@ Server { return; } } - new TableAddFamilyHandler(tableName, column, this, this, - this, supportInstantSchemaChanges).process(); + new TableAddFamilyHandler(tableName, column, this, this).process(); if (cpHost != null) { cpHost.postAddColumn(tableName, column); } @@ -1380,8 +1258,7 @@ Server { return; } } - new TableModifyFamilyHandler(tableName, descriptor, this, this, - this, supportInstantSchemaChanges).process(); + new TableModifyFamilyHandler(tableName, descriptor, this, this).process(); if (cpHost != null) { cpHost.postModifyColumn(tableName, descriptor); } @@ -1395,8 +1272,7 @@ Server { return; } } - new TableDeleteFamilyHandler(tableName, c, this, this, - this, supportInstantSchemaChanges).process(); + new TableDeleteFamilyHandler(tableName, c, this, this).process(); if (cpHost != null) { cpHost.postDeleteColumn(tableName, c); } @@ -1408,7 +1284,7 @@ Server { cpHost.preEnableTable(tableName); } this.executorService.submit(new EnableTableHandler(this, tableName, - catalogTracker, assignmentManager, false)); + catalogTracker, assignmentManager, false)); if (cpHost != null) { cpHost.postEnableTable(tableName); @@ -1470,8 +1346,7 @@ Server { if (cpHost != null) { cpHost.preModifyTable(tableName, htd); } - TableEventHandler tblHandle = new ModifyTableHandler(tableName, htd, this, - this, this, supportInstantSchemaChanges); + TableEventHandler tblHandle = new ModifyTableHandler(tableName, htd, this, this); this.executorService.submit(tblHandle); tblHandle.waitForPersist(); @@ -1480,26 +1355,8 @@ Server { } } - private boolean isOnlineSchemaChangeAllowed() { - return conf.getBoolean( - "hbase.online.schema.update.enable", false); - } - @Override - public void checkTableModifiable(final byte [] tableName, - EventHandler.EventType eventType) - throws IOException { - preCheckTableModifiable(tableName); - if (!eventType.isSchemaChangeEvent() || - !isOnlineSchemaChangeAllowed()) { - if (!getAssignmentManager().getZKTable(). - isDisabledTable(Bytes.toString(tableName))) { - throw new TableNotDisabledException(tableName); - } - } - } - - private void preCheckTableModifiable(final byte[] tableName) + public void checkTableModifiable(final byte [] tableName) throws IOException { String tableNameStr = Bytes.toString(tableName); if (isCatalogTable(tableName)) { @@ -1508,6 +1365,10 @@ Server { if (!MetaReader.tableExists(getCatalogTracker(), tableNameStr)) { throw new TableNotFoundException(tableNameStr); } + if (!getAssignmentManager().getZKTable(). + isDisabledTable(Bytes.toString(tableName))) { + throw new TableNotDisabledException(tableName); + } } public void clearFromTransition(HRegionInfo hri) { diff --git a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java index 8a7da2e4217..befc7644080 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java +++ b/src/main/java/org/apache/hadoop/hbase/master/MasterFileSystem.java @@ -514,7 +514,7 @@ public class MasterFileSystem { */ public HTableDescriptor addColumn(byte[] tableName, HColumnDescriptor hcd) throws IOException { - LOG.debug("AddColumn. Table = " + Bytes.toString(tableName) + " HCD = " + + LOG.info("AddColumn. Table = " + Bytes.toString(tableName) + " HCD = " + hcd.toString()); HTableDescriptor htd = this.services.getTableDescriptors().get(tableName); if (htd == null) { diff --git a/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java b/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java index e1e6685891a..989e6758d79 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java +++ b/src/main/java/org/apache/hadoop/hbase/master/MasterServices.java @@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.TableDescriptors; import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.ExecutorService; -import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; /** @@ -56,15 +55,12 @@ public interface MasterServices extends Server { public ExecutorService getExecutorService(); /** - * Check table modifiable. i.e not ROOT or META and offlined for all commands except - * alter commands - * @param tableName - * @param eventType - * @throws IOException + * Check table is modifiable; i.e. exists and is offline. + * @param tableName Name of table to check. + * @throws TableNotDisabledException + * @throws TableNotFoundException */ - public void checkTableModifiable(final byte [] tableName, - EventHandler.EventType eventType) - throws IOException; + public void checkTableModifiable(final byte [] tableName) throws IOException; /** * Create a table using the given table definition. @@ -80,21 +76,8 @@ public interface MasterServices extends Server { */ public TableDescriptors getTableDescriptors(); - /** - * Get Master Schema change tracker - * @return - */ - public MasterSchemaChangeTracker getSchemaChangeTracker(); - - /** - * Return the Region server tracker. - * @return RegionServerTracker - */ - public RegionServerTracker getRegionServerTracker(); - /** * @return true if master enables ServerShutdownHandler; */ public boolean isServerShutdownHandlerEnabled(); - } diff --git a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java index 3515d4af708..1c253a0345a 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java +++ b/src/main/java/org/apache/hadoop/hbase/master/ServerManager.java @@ -345,15 +345,6 @@ public class ServerManager { } } - /** - * Exclude a RS from any pending schema change process. - * @param serverName - */ - private void excludeRegionServerFromSchemaChanges(final ServerName serverName) { - this.services.getSchemaChangeTracker() - .excludeRegionServerForSchemaChanges(serverName.getServerName()); - } - /* * Expire the passed server. Add it to list of deadservers and queue a * shutdown processing. @@ -365,7 +356,6 @@ public class ServerManager { this.deadNotExpiredServers.add(serverName); return; } - excludeRegionServerFromSchemaChanges(serverName); if (!this.onlineServers.containsKey(serverName)) { LOG.warn("Received expiration of " + serverName + " but server is not currently online"); diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java index 02bec37438e..bb3d5cca9d1 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/DeleteTableHandler.java @@ -28,7 +28,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.catalog.MetaEditor; -import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.AssignmentManager; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -40,11 +39,9 @@ public class DeleteTableHandler extends TableEventHandler { private static final Log LOG = LogFactory.getLog(DeleteTableHandler.class); public DeleteTableHandler(byte [] tableName, Server server, - final MasterServices masterServices, HMasterInterface masterInterface, - boolean instantChange) + final MasterServices masterServices) throws IOException { - super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices, - masterInterface, instantChange); + super(EventType.C_M_DELETE_TABLE, tableName, server, masterServices); // The next call fails if no such table. getTableDescriptor(); } diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java index 4a735ec3181..c11238e96fd 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/ModifyTableHandler.java @@ -26,7 +26,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.MasterServices; @InterfaceAudience.Private @@ -35,11 +34,9 @@ public class ModifyTableHandler extends TableEventHandler { public ModifyTableHandler(final byte [] tableName, final HTableDescriptor htd, final Server server, - final MasterServices masterServices, final HMasterInterface masterInterface, - boolean instantModify) + final MasterServices masterServices) throws IOException { - super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices, - masterInterface, instantModify); + super(EventType.C_M_MODIFY_TABLE, tableName, server, masterServices); // Check table exists. getTableDescriptor(); // This is the new schema we are going to write out as this modification. diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java index 4ec3d684a1f..ec726e9c555 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/TableAddFamilyHandler.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.MasterServices; /** @@ -40,10 +39,8 @@ public class TableAddFamilyHandler extends TableEventHandler { private final HColumnDescriptor familyDesc; public TableAddFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc, - Server server, final MasterServices masterServices, - HMasterInterface masterInterface, boolean instantChange) throws IOException { - super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices, - masterInterface, instantChange); + Server server, final MasterServices masterServices) throws IOException { + super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices); HTableDescriptor htd = getTableDescriptor(); if (htd.hasFamily(familyDesc.getName())) { throw new InvalidFamilyOperationException("Family '" + diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java index bfa624b80ff..0fb20b6256a 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/TableDeleteFamilyHandler.java @@ -26,7 +26,6 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -39,10 +38,8 @@ public class TableDeleteFamilyHandler extends TableEventHandler { private final byte [] familyName; public TableDeleteFamilyHandler(byte[] tableName, byte [] familyName, - Server server, final MasterServices masterServices, - HMasterInterface masterInterface, boolean instantChange) throws IOException { - super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices, - masterInterface, instantChange); + Server server, final MasterServices masterServices) throws IOException { + super(EventType.C_M_ADD_FAMILY, tableName, server, masterServices); HTableDescriptor htd = getTableDescriptor(); this.familyName = hasColumnFamily(htd, familyName); } diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java index 6d173bbc7b8..32fa7f293c7 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/TableEventHandler.java @@ -37,19 +37,13 @@ import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.Server; import org.apache.hadoop.hbase.ServerName; import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.TableNotDisabledException; import org.apache.hadoop.hbase.catalog.MetaReader; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.executor.EventHandler; -import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.BulkReOpen; import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.monitoring.TaskMonitor; import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; -import org.apache.hadoop.hbase.zookeeper.ZKAssign; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.apache.zookeeper.KeeperException; import com.google.common.collect.Lists; @@ -66,25 +60,35 @@ import com.google.common.collect.Maps; public abstract class TableEventHandler extends EventHandler { private static final Log LOG = LogFactory.getLog(TableEventHandler.class); protected final MasterServices masterServices; - protected HMasterInterface master = null; protected final byte [] tableName; protected final String tableNameStr; - protected boolean instantAction = false; protected boolean persistedToZk = false; public TableEventHandler(EventType eventType, byte [] tableName, Server server, - MasterServices masterServices, HMasterInterface masterInterface, - boolean instantSchemaChange) + MasterServices masterServices) throws IOException { super(server, eventType); this.masterServices = masterServices; this.tableName = tableName; - this.masterServices.checkTableModifiable(tableName, eventType); + try { + this.masterServices.checkTableModifiable(tableName); + } catch (TableNotDisabledException ex) { + if (isOnlineSchemaChangeAllowed() + && eventType.isOnlineSchemaChangeSupported()) { + LOG.debug("Ignoring table not disabled exception " + + "for supporting online schema changes."); + } else { + throw ex; + } + } this.tableNameStr = Bytes.toString(this.tableName); - this.instantAction = instantSchemaChange; - this.master = masterInterface; } + private boolean isOnlineSchemaChangeAllowed() { + return this.server.getConfiguration().getBoolean( + "hbase.online.schema.update.enable", false); + } + @Override public void process() { try { @@ -94,7 +98,16 @@ public abstract class TableEventHandler extends EventHandler { MetaReader.getTableRegions(this.server.getCatalogTracker(), tableName); handleTableOperation(hris); - handleSchemaChanges(hris); + if (eventType.isOnlineSchemaChangeSupported() && this.masterServices. + getAssignmentManager().getZKTable(). + isEnabledTable(Bytes.toString(tableName))) { + if (reOpenAllRegions(hris)) { + LOG.info("Completed table operation " + eventType + " on table " + + Bytes.toString(tableName)); + } else { + LOG.warn("Error on reopening the regions"); + } + } } catch (IOException e) { LOG.error("Error manipulating table " + Bytes.toString(tableName), e); } catch (KeeperException e) { @@ -105,48 +118,13 @@ public abstract class TableEventHandler extends EventHandler { } } - private void handleSchemaChanges(List regions) - throws IOException { - if (instantAction && regions != null && !regions.isEmpty()) { - handleInstantSchemaChanges(regions); - } else { - handleRegularSchemaChanges(regions); - } - } - - - /** - * Perform schema changes only if the table is in enabled state. - * @return - */ - private boolean canPerformSchemaChange() { - return (eventType.isSchemaChangeEvent() && this.masterServices. - getAssignmentManager().getZKTable(). - isEnabledTable(Bytes.toString(tableName))); - } - - private void handleRegularSchemaChanges(List regions) - throws IOException { - if (canPerformSchemaChange()) { - this.masterServices.getAssignmentManager().setRegionsToReopen(regions); - setPersist(); - if (reOpenAllRegions(regions)) { - LOG.info("Completed table operation " + eventType + " on table " + - Bytes.toString(tableName)); - } else { - LOG.warn("Error on reopening the regions"); - } - } - } - public boolean reOpenAllRegions(List regions) throws IOException { boolean done = false; LOG.info("Bucketing regions by region server..."); HTable table = new HTable(masterServices.getConfiguration(), tableName); TreeMap> serverToRegions = Maps .newTreeMap(); - NavigableMap hriHserverMapping - = table.getRegionLocations(); + NavigableMap hriHserverMapping = table.getRegionLocations(); List reRegions = new ArrayList(); for (HRegionInfo hri : regions) { ServerName rsLocation = hriHserverMapping.get(hri); @@ -188,32 +166,6 @@ public abstract class TableEventHandler extends EventHandler { return done; } - /** - * Check whether any of the regions from the list of regions is undergoing a split. - * We simply check whether there is a unassigned node for any of the region and if so - * we return as true. - * @param regionInfos - * @return - */ - private boolean isSplitInProgress(List regionInfos) { - for (HRegionInfo hri : regionInfos) { - ZooKeeperWatcher zkw = this.masterServices.getZooKeeper(); - String node = ZKAssign.getNodeName(zkw, hri.getEncodedName()); - try { - if (ZKUtil.checkExists(zkw, node) != -1) { - LOG.debug("Region " + hri.getRegionNameAsString() + " is unassigned. Assuming" + - " that it is undergoing a split"); - return true; - } - } catch (KeeperException ke) { - LOG.debug("KeeperException while determining splits in progress.", ke); - // Assume no splits happening? - return false; - } - } - return false; - } - /** * Table modifications are processed asynchronously, but provide an API for * you to query their status. @@ -238,65 +190,6 @@ public abstract class TableEventHandler extends EventHandler { } } - /** - * Wait for region split transaction in progress (if any) - * @param regions - * @param status - */ - private void waitForInflightSplit(List regions, MonitoredTask status) { - while (isSplitInProgress(regions)) { - try { - status.setStatus("Alter Schema is waiting for split region to complete."); - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - protected void handleInstantSchemaChanges(List regions) { - if (regions == null || regions.isEmpty()) { - LOG.debug("Region size is null or empty. Ignoring alter request."); - return; - } - MonitoredTask status = TaskMonitor.get().createStatus( - "Handling alter table request for table = " + tableNameStr); - if (canPerformSchemaChange()) { - boolean prevBalanceSwitch = false; - try { - // turn off load balancer synchronously - prevBalanceSwitch = master.synchronousBalanceSwitch(false); - waitForInflightSplit(regions, status); - MasterSchemaChangeTracker masterSchemaChangeTracker = - this.masterServices.getSchemaChangeTracker(); - masterSchemaChangeTracker - .createSchemaChangeNode(Bytes.toString(tableName), - regions.size()); - while(!masterSchemaChangeTracker.doesSchemaChangeNodeExists( - Bytes.toString(tableName))) { - try { - Thread.sleep(50); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - status.markComplete("Created ZK node for handling the alter table request for table = " - + tableNameStr); - } catch (KeeperException e) { - LOG.warn("Instant schema change failed for table " + tableNameStr, e); - status.setStatus("Instant schema change failed for table " + tableNameStr - + " Cause = " + e.getCause()); - - } catch (IOException ioe) { - LOG.warn("Instant schema change failed for table " + tableNameStr, ioe); - status.setStatus("Instant schema change failed for table " + tableNameStr - + " Cause = " + ioe.getCause()); - } finally { - master.synchronousBalanceSwitch(prevBalanceSwitch); - } - } - } - /** * @return Table descriptor for this table * @throws TableExistsException diff --git a/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java b/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java index b4f8cd4c647..e1868a28e7a 100644 --- a/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java +++ b/src/main/java/org/apache/hadoop/hbase/master/handler/TableModifyFamilyHandler.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.InvalidFamilyOperationException; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ipc.HMasterInterface; import org.apache.hadoop.hbase.master.MasterServices; import org.apache.hadoop.hbase.util.Bytes; @@ -41,10 +40,8 @@ public class TableModifyFamilyHandler extends TableEventHandler { public TableModifyFamilyHandler(byte[] tableName, HColumnDescriptor familyDesc, Server server, - final MasterServices masterServices, - HMasterInterface masterInterface, boolean instantChange) throws IOException { - super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices, - masterInterface, instantChange); + final MasterServices masterServices) throws IOException { + super(EventType.C_M_MODIFY_FAMILY, tableName, server, masterServices); HTableDescriptor htd = getTableDescriptor(); hasColumnFamily(htd, familyDesc.getName()); this.familyDesc = familyDesc; diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java index 2913c2bcb09..1081fc625f9 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/CompactSplitThread.java @@ -157,29 +157,12 @@ public class CompactSplitThread implements CompactionRequestor { return false; } - /** - * Wait for mid-flight schema alter requests. (if any). We don't want to execute a split - * when a schema alter is in progress as we end up in an inconsistent state. - * @param tableName - */ - private void waitForInflightSchemaChange(String tableName) { - while (this.server.getSchemaChangeTracker() - .isSchemaChangeInProgress(tableName)) { - try { - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - public synchronized void requestSplit(final HRegion r, byte[] midKey) { if (midKey == null) { LOG.debug("Region " + r.getRegionNameAsString() + " not splittable because midkey=null"); return; } - waitForInflightSchemaChange(r.getRegionInfo().getTableNameAsString()); try { this.splits.execute(new SplitRequest(r, midKey, this.server)); if (LOG.isDebugEnabled()) { diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 4f8099929ff..e504929e2f9 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -148,7 +148,6 @@ import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.VersionInfo; import org.apache.hadoop.hbase.zookeeper.ClusterStatusTracker; import org.apache.hadoop.hbase.zookeeper.RootRegionTracker; -import org.apache.hadoop.hbase.zookeeper.SchemaChangeTracker; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZooKeeperNodeTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; @@ -294,9 +293,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, // Cluster Status Tracker private ClusterStatusTracker clusterStatusTracker; - // Schema change Tracker - private SchemaChangeTracker schemaChangeTracker; - // Log Splitting Worker private SplitLogWorker splitLogWorker; @@ -599,11 +595,6 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, this.catalogTracker = new CatalogTracker(this.zooKeeper, this.conf, this, this.conf.getInt("hbase.regionserver.catalog.timeout", Integer.MAX_VALUE)); catalogTracker.start(); - - // Schema change tracker - this.schemaChangeTracker = new SchemaChangeTracker(this.zooKeeper, - this, this); - this.schemaChangeTracker.start(); } /** @@ -2901,26 +2892,9 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, splitRegion(regionInfo, null); } - /** - * Wait for mid-flight schema change requests. (if any) - * @param tableName - */ - private void waitForSchemaChange(String tableName) { - while (schemaChangeTracker.isSchemaChangeInProgress(tableName)) { - try { - LOG.debug("Schema alter is inprogress for table = " + tableName - + " Waiting for alter to complete before a split"); - Thread.sleep(100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - @Override public void splitRegion(HRegionInfo regionInfo, byte[] splitPoint) throws NotServingRegionException, IOException { - waitForSchemaChange(Bytes.toString(regionInfo.getTableName())); checkOpen(); HRegion region = getRegion(regionInfo.getRegionName()); region.flushcache(); @@ -3672,58 +3646,27 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, } /** - * Refresh schema changes for given region. - * @param hRegion HRegion to refresh - * @throws IOException - */ - public void refreshRegion(HRegion hRegion) throws IOException { - - if (hRegion != null) { + * Gets the online regions of the specified table. + * This method looks at the in-memory onlineRegions. It does not go to .META.. + * Only returns online regions. If a region on this table has been + * closed during a disable, etc., it will not be included in the returned list. + * So, the returned list may not necessarily be ALL regions in this table, its + * all the ONLINE regions in the table. + * @param tableName + * @return Online regions from tableName + */ + public List getOnlineRegions(byte[] tableName) { + List tableRegions = new ArrayList(); synchronized (this.onlineRegions) { - HRegionInfo regionInfo = hRegion.getRegionInfo(); - // Close the region - hRegion.close(); - // Remove from online regions - removeFromOnlineRegions(regionInfo.getEncodedName()); - // Get new HTD - HTableDescriptor htd = this.tableDescriptors.get(regionInfo.getTableName()); - LOG.debug("HTD for region = " + regionInfo.getRegionNameAsString() - + " Is = " + htd ); - HRegion region = - HRegion.openHRegion(hRegion.getRegionInfo(), htd, hlog, conf, - this, null); - // Add new region to the onlineRegions - addToOnlineRegions(region); + for (HRegion region: this.onlineRegions.values()) { + HRegionInfo regionInfo = region.getRegionInfo(); + if(Bytes.equals(regionInfo.getTableName(), tableName)) { + tableRegions.add(region); + } + } } + return tableRegions; } - } - - /** - * Gets the online regions of the specified table. - * This method looks at the in-memory onlineRegions. It does not go to .META.. - * Only returns online regions. If a region on this table has been - * closed during a disable, etc., it will not be included in the returned list. - * So, the returned list may not necessarily be ALL regions in this table, its - * all the ONLINE regions in the table. - * @param tableName - * @return Online regions from tableName - */ - public List getOnlineRegions(byte[] tableName) { - List tableRegions = new ArrayList(); - synchronized (this.onlineRegions) { - for (HRegion region: this.onlineRegions.values()) { - HRegionInfo regionInfo = region.getRegionInfo(); - if(Bytes.equals(regionInfo.getTableName(), tableName)) { - tableRegions.add(region); - } - } - } - return tableRegions; - } - - public SchemaChangeTracker getSchemaChangeTracker() { - return this.schemaChangeTracker; - } // used by org/apache/hbase/tmpl/regionserver/RSStatusTmpl.jamon (HBASE-4070). public String[] getCoprocessors() { @@ -3741,5 +3684,4 @@ public class HRegionServer implements HRegionInterface, HBaseRPCErrorHandler, mxBeanInfo); LOG.info("Registered RegionServer MXBean"); } - -} +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java b/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java index da9e5cfc6a1..b038ef36afa 100644 --- a/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java +++ b/src/main/java/org/apache/hadoop/hbase/regionserver/OnlineRegions.java @@ -19,12 +19,12 @@ */ package org.apache.hadoop.hbase.regionserver; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.Server; - import java.io.IOException; import java.util.List; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hbase.Server; + /** * Interface to Map of online regions. In the Map, the key is the region's * encoded name and the value is an {@link HRegion} instance. @@ -54,18 +54,12 @@ interface OnlineRegions extends Server { * null if named region is not member of the online regions. */ public HRegion getFromOnlineRegions(String encodedRegionName); - /** - * Get all online regions of a table in this RS. - * @param tableName - * @return List of HRegion - * @throws java.io.IOException - */ - public List getOnlineRegions(byte[] tableName) throws IOException; - /** - * Refresh a given region updating it with latest HTD info. - * @param hRegion - */ - public void refreshRegion(HRegion hRegion) throws IOException; - -} + /** + * Get all online regions of a table in this RS. + * @param tableName + * @return List of HRegion + * @throws java.io.IOException + */ + public List getOnlineRegions(byte[] tableName) throws IOException; +} \ No newline at end of file diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java deleted file mode 100644 index 6c723b47127..00000000000 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/MasterSchemaChangeTracker.java +++ /dev/null @@ -1,828 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.zookeeper; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.master.MasterServices; -import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.monitoring.TaskMonitor; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.io.Writable; -import org.apache.zookeeper.KeeperException; - -@InterfaceAudience.Private -public class MasterSchemaChangeTracker extends ZooKeeperNodeTracker { - public static final Log LOG = LogFactory.getLog(MasterSchemaChangeTracker.class); - private final MasterServices masterServices; - // Used by tests only. Do not change this. - private volatile int sleepTimeMillis = 0; - // schema changes pending more than this time will be timed out. - private long schemaChangeTimeoutMillis = 30000; - - /** - * Constructs a new ZK node tracker. - *

- *

After construction, use {@link #start} to kick off tracking. - * - * @param watcher - * @param abortable - */ - public MasterSchemaChangeTracker(ZooKeeperWatcher watcher, - Abortable abortable, MasterServices masterServices, - long schemaChangeTimeoutMillis) { - super(watcher, watcher.schemaZNode, abortable); - this.masterServices = masterServices; - this.schemaChangeTimeoutMillis = schemaChangeTimeoutMillis; - } - - @Override - public void start() { - try { - watcher.registerListener(this); - List tables = - ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode); - processCompletedSchemaChanges(tables); - } catch (KeeperException e) { - LOG.error("MasterSchemaChangeTracker startup failed.", e); - abortable.abort("MasterSchemaChangeTracker startup failed", e); - } - } - - private List getCurrentTables() throws KeeperException { - return - ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode); - } - - /** - * When a primary master crashes and the secondary master takes over - * mid-flight during an alter process, the secondary should cleanup any completed - * schema changes not handled by the previous master. - * @param tables - * @throws KeeperException - */ - private void processCompletedSchemaChanges(List tables) - throws KeeperException { - if (tables == null || tables.isEmpty()) { - String msg = "No current schema change in progress. Skipping cleanup"; - LOG.debug(msg); - return; - } - String msg = "Master seeing following tables undergoing schema change " + - "process. Tables = " + tables; - MonitoredTask status = TaskMonitor.get().createStatus(msg); - LOG.debug(msg); - for (String table : tables) { - LOG.debug("Processing table = "+ table); - status.setStatus("Processing table = "+ table); - try { - processTableNode(table); - } catch (IOException e) { - String errmsg = "IOException while processing completed schema changes." - + " Cause = " + e.getCause(); - LOG.error(errmsg, e); - status.setStatus(errmsg); - } - } - } - - /** - * Get current alter status for a table. - * @param tableName - * @return MasterAlterStatus - * @throws KeeperException - * @throws IOException - */ - public MasterAlterStatus getMasterAlterStatus(String tableName) - throws KeeperException, IOException { - String path = getSchemaChangeNodePathForTable(tableName); - byte[] state = ZKUtil.getData(watcher, path); - if (state == null || state.length <= 0) { - return null; - } - MasterAlterStatus mas = new MasterAlterStatus(); - Writables.getWritable(state, mas); - return mas; - } - - /** - * Get RS specific alter status for a table & server - * @param tableName - * @param serverName - * @return Region Server's Schema alter status - * @throws KeeperException - * @throws IOException - */ - private SchemaChangeTracker.SchemaAlterStatus getRSSchemaAlterStatus( - String tableName, String serverName) - throws KeeperException, IOException { - String childPath = - getSchemaChangeNodePathForTableAndServer(tableName, serverName); - byte[] childData = ZKUtil.getData(this.watcher, childPath); - if (childData == null || childData.length <= 0) { - return null; - } - SchemaChangeTracker.SchemaAlterStatus sas = - new SchemaChangeTracker.SchemaAlterStatus(); - Writables.getWritable(childData, sas); - LOG.debug("Schema Status data for server = " + serverName + " table = " - + tableName + " == " + sas); - return sas; - } - - /** - * Update the master's alter status based on all region server's response. - * @param servers - * @param tableName - * @throws IOException - */ - private void updateMasterAlterStatus(MasterAlterStatus mas, - List servers, String tableName) - throws IOException, KeeperException { - for (String serverName : servers) { - SchemaChangeTracker.SchemaAlterStatus sas = - getRSSchemaAlterStatus(tableName, serverName); - if (sas != null) { - mas.update(sas); - LOG.debug("processTableNodeWithState:Updated Master Alter Status = " - + mas + " for server = " + serverName); - } else { - LOG.debug("SchemaAlterStatus is NULL for table = " + tableName); - } - } - } - - /** - * If schema alter is handled for this table, then delete all the ZK nodes - * created for this table. - * @param tableName - * @throws KeeperException - */ - private void processTableNode(String tableName) throws KeeperException, - IOException { - LOG.debug("processTableNodeWithState. TableName = " + tableName); - List servers = - ZKUtil.listChildrenAndWatchThem(watcher, - getSchemaChangeNodePathForTable(tableName)); - MasterAlterStatus mas = getMasterAlterStatus(tableName); - if (mas == null) { - LOG.debug("MasterAlterStatus is NULL. Table = " + tableName); - return; - } - updateMasterAlterStatus(mas, servers, tableName); - LOG.debug("Current Alter status = " + mas); - String nodePath = getSchemaChangeNodePathForTable(tableName); - ZKUtil.updateExistingNodeData(this.watcher, nodePath, - Writables.getBytes(mas), getZKNodeVersion(nodePath)); - processAlterStatus(mas, tableName, servers); - } - - /** - * Evaluate the master alter status and determine the current status. - * @param alterStatus - * @param tableName - * @param servers - * @param status - */ - private void processAlterStatus(MasterAlterStatus alterStatus, - String tableName, List servers) - throws KeeperException { - if (alterStatus.getNumberOfRegionsToProcess() - == alterStatus.getNumberOfRegionsProcessed()) { - // schema change completed. - String msg = "All region servers have successfully processed the " + - "schema changes for table = " + tableName - + " . Deleting the schema change node for table = " - + tableName + " Region servers processed the schema change" + - " request = " + alterStatus.getProcessedHosts() - + " Total number of regions = " + alterStatus.getNumberOfRegionsToProcess() - + " Processed regions = " + alterStatus.getNumberOfRegionsProcessed(); - MonitoredTask status = TaskMonitor.get().createStatus( - "Checking alter schema request status for table = " + tableName); - status.markComplete(msg); - LOG.debug(msg); - cleanProcessedTableNode(getSchemaChangeNodePathForTable(tableName)); - } else { - if (alterStatus.getErrorCause() != null - && alterStatus.getErrorCause().trim().length() > 0) { - String msg = "Alter schema change failed " - + "for table = " + tableName + " Number of online regions = " - + alterStatus.getNumberOfRegionsToProcess() + " processed regions count = " - + alterStatus.getNumberOfRegionsProcessed() - + " Original list = " + alterStatus.hostsToProcess + " Processed servers = " - + servers - + " Error Cause = " + alterStatus.getErrorCause(); - MonitoredTask status = TaskMonitor.get().createStatus( - "Checking alter schema request status for table = " + tableName); - // we have errors. - LOG.debug(msg); - status.abort(msg); - } else { - String msg = "Not all region servers have processed the schema changes" - + "for table = " + tableName + " Number of online regions = " - + alterStatus.getNumberOfRegionsToProcess() + " processed regions count = " - + alterStatus.getNumberOfRegionsProcessed() - + " Original list = " + alterStatus.hostsToProcess + " Processed servers = " - + servers + " Alter STate = " - + alterStatus.getCurrentAlterStatus(); - LOG.debug(msg); - // status.setStatus(msg); - } - } - } - - /** - * Check whether a in-flight schema change request has expired. - * @param tableName - * @return true is the schema change request expired. - * @throws IOException - */ - private boolean hasSchemaChangeExpiredFor(String tableName) - throws IOException, KeeperException { - MasterAlterStatus mas = getMasterAlterStatus(tableName); - long createdTimeStamp = mas.getStamp(); - long duration = System.currentTimeMillis() - createdTimeStamp; - LOG.debug("Created TimeStamp = " + createdTimeStamp - + " duration = " + duration + " Table = " + tableName - + " Master Alter Status = " + mas); - return (duration > schemaChangeTimeoutMillis); - } - - /** - * Handle failed and expired schema changes. We simply delete all the - * expired/failed schema change attempts. Why we should do this ? - * 1) Keeping the failed/expired schema change nodes longer prohibits any - * future schema changes for the table. - * 2) Any lingering expired/failed schema change requests will prohibit the - * load balancer from running. - */ - public void handleFailedOrExpiredSchemaChanges() { - try { - List tables = getCurrentTables(); - for (String table : tables) { - String statmsg = "Cleaning failed or expired schema change requests. " + - "current tables undergoing " + - "schema change process = " + tables; - MonitoredTask status = TaskMonitor.get().createStatus(statmsg); - LOG.debug(statmsg); - if (hasSchemaChangeExpiredFor(table)) { - // time out.. currently, we abandon the in-flight schema change due to - // time out. - // Here, there are couple of options to consider. One could be to - // attempt a retry of the schema change and see if it succeeds, and - // another could be to simply rollback the schema change effort and - // see if it succeeds. - String msg = "Schema change for table = " + table + " has expired." - + " Schema change for this table has been in progress for " + - + schemaChangeTimeoutMillis + - "Deleting the node now."; - LOG.debug(msg); - ZKUtil.deleteNodeRecursively(this.watcher, - getSchemaChangeNodePathForTable(table)); - } else { - String msg = "Schema change request is in progress for " + - " table = " + table; - LOG.debug(msg); - status.setStatus(msg); - } - } - } catch (IOException e) { - String msg = "IOException during handleFailedExpiredSchemaChanges." - + e.getCause(); - LOG.error(msg, e); - TaskMonitor.get().createStatus(msg); - } catch (KeeperException ke) { - String msg = "KeeperException during handleFailedExpiredSchemaChanges." - + ke.getCause(); - LOG.error(msg, ke); - TaskMonitor.get().createStatus(msg); - } - } - - /** - * Clean the nodes of completed schema change table. - * @param path - * @throws KeeperException - */ - private void cleanProcessedTableNode(String path) throws KeeperException { - if (sleepTimeMillis > 0) { - try { - LOG.debug("Master schema change tracker sleeping for " - + sleepTimeMillis); - Thread.sleep(sleepTimeMillis); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - ZKUtil.deleteNodeRecursively(this.watcher, path); - LOG.debug("Deleted all nodes for path " + path); - - } - - /** - * Exclude a RS from schema change request (if applicable) - * We will exclude a RS from schema change request processing if 1) RS - * has online regions for the table AND 2) RS went down mid-flight - * during schema change process. We don't have to deal with RS going - * down mid-flight during a schema change as the online regions from - * the dead RS will get reassigned to some other RS and the - * process of reassign inherently takes care of the schema change as well. - * @param serverName - */ - public void excludeRegionServerForSchemaChanges(String serverName) { - try { - MonitoredTask status = TaskMonitor.get().createStatus( - "Processing schema change exclusion for region server = " + serverName); - List tables = - ZKUtil.listChildrenNoWatch(watcher, watcher.schemaZNode); - if (tables == null || tables.isEmpty()) { - String msg = "No schema change in progress. Skipping exclusion for " + - "server = "+ serverName; - LOG.debug(msg); - status.setStatus(msg); - return ; - } - for (String tableName : tables) { - excludeRegionServer(tableName, serverName, status); - } - } catch(KeeperException ke) { - LOG.error("KeeperException during excludeRegionServerForSchemaChanges", ke); - } catch(IOException ioe) { - LOG.error("IOException during excludeRegionServerForSchemaChanges", ioe); - - } - } - - /** - * Check whether a schema change is in progress for a given table on a - * given RS. - * @param tableName - * @param serverName - * @return TRUE is this RS is currently processing a schema change request - * for the table. - * @throws KeeperException - */ - private boolean isSchemaChangeApplicableFor(String tableName, - String serverName) - throws KeeperException { - List servers = ZKUtil.listChildrenAndWatchThem(watcher, - getSchemaChangeNodePathForTable(tableName)); - return (servers.contains(serverName)); - } - - /** - * Exclude a region server for a table (if applicable) from schema change processing. - * @param tableName - * @param serverName - * @param status - * @throws KeeperException - * @throws IOException - */ - private void excludeRegionServer(String tableName, String serverName, - MonitoredTask status) - throws KeeperException, IOException { - if (isSchemaChangeApplicableFor(tableName, serverName)) { - String msg = "Excluding RS " + serverName + " from schema change process" + - " for table = " + tableName; - LOG.debug(msg); - status.setStatus(msg); - SchemaChangeTracker.SchemaAlterStatus sas = - getRSSchemaAlterStatus(tableName, serverName); - if (sas == null) { - LOG.debug("SchemaAlterStatus is NULL for table = " + tableName - + " server = " + serverName); - return; - } - // Set the status to IGNORED so we can process it accordingly. - sas.setCurrentAlterStatus( - SchemaChangeTracker.SchemaAlterStatus.AlterState.IGNORED); - LOG.debug("Updating the current schema status to " + sas); - String nodePath = getSchemaChangeNodePathForTableAndServer(tableName, - serverName); - ZKUtil.updateExistingNodeData(this.watcher, - nodePath, Writables.getBytes(sas), getZKNodeVersion(nodePath)); - } else { - LOG.debug("Skipping exclusion of RS " + serverName - + " from schema change process" - + " for table = " + tableName - + " as it did not possess any online regions for the table"); - } - processTableNode(tableName); - } - - private int getZKNodeVersion(String nodePath) throws KeeperException { - return ZKUtil.checkExists(this.watcher, nodePath); - } - - /** - * Create a new schema change ZK node. - * @param tableName Table name that is getting altered - * @throws KeeperException - */ - public void createSchemaChangeNode(String tableName, - int numberOfRegions) - throws KeeperException, IOException { - MonitoredTask status = TaskMonitor.get().createStatus( - "Creating schema change node for table = " + tableName); - LOG.debug("Creating schema change node for table = " - + tableName + " Path = " - + getSchemaChangeNodePathForTable(tableName)); - if (doesSchemaChangeNodeExists(tableName)) { - LOG.debug("Schema change node already exists for table = " + tableName - + " Deleting the schema change node."); - // If we already see a schema change node for this table we wait till the previous - // alter process is complete. Ideally, we need not wait and we could simply delete - // existing schema change node for this table and create new one. But then the - // RS cloud will not be able to process concurrent schema updates for the same table - // as they will be working with same set of online regions for this table. Meaning the - // second alter change will not see any online regions (as they were being closed and - // re opened by the first change) and will miss the second one. - // We either handle this at the RS level using explicit locks while processing a table - // or do it here. I prefer doing it here as it seems much simpler and cleaner. - while(doesSchemaChangeNodeExists(tableName)) { - try { - Thread.sleep(50); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } - - int rsCount = ZKUtil.getNumberOfChildren(this.watcher, watcher.rsZNode); - // if number of online RS = 0, we should not do anything! - if (rsCount <= 0) { - String msg = "Master is not seeing any online region servers. Aborting the " + - "schema change processing by region servers."; - LOG.debug(msg); - status.abort(msg); - } else { - LOG.debug("Master is seeing " + rsCount + " region servers online before " + - "the schema change process."); - MasterAlterStatus mas = new MasterAlterStatus(numberOfRegions, - getActiveRegionServersAsString()); - LOG.debug("Master creating the master alter status = " + mas); - ZKUtil.createSetData(this.watcher, - getSchemaChangeNodePathForTable(tableName), Writables.getBytes(mas)); - status.markComplete("Created the ZK node for schema change. Current Alter Status = " - + mas.toString()); - ZKUtil.listChildrenAndWatchThem(this.watcher, - getSchemaChangeNodePathForTable(tableName)); - } - } - - private String getActiveRegionServersAsString() { - StringBuffer sbuf = new StringBuffer(); - List currentRS = - masterServices.getRegionServerTracker().getOnlineServers(); - for (ServerName serverName : currentRS) { - sbuf.append(serverName.getServerName()); - sbuf.append(" "); - } - LOG.debug("Current list of RS to process the schema change = " - + sbuf.toString()); - return sbuf.toString(); - } - - /** - * Create a new schema change ZK node. - * @param tableName - * @throws KeeperException - */ - public boolean doesSchemaChangeNodeExists(String tableName) - throws KeeperException { - return ZKUtil.checkExists(watcher, - getSchemaChangeNodePathForTable(tableName)) != -1; - } - - /** - * Check whether there are any schema change requests that are in progress now. - * We simply assume that a schema change is in progress if we see a ZK schema node for - * any table. We may revisit for fine grained checks such as check the current alter status - * et al, but it is not required now. - * @return - */ - public boolean isSchemaChangeInProgress() { - try { - int schemaChangeCount = ZKUtil.getNumberOfChildren(this.watcher, watcher.schemaZNode); - return schemaChangeCount > 0; - } catch (KeeperException ke) { - LOG.debug("KeeperException while getting current schema change progress."); - // What do we do now??? currently reporting as false. - } - return false; - } - - /** - * We get notified when a RS processes/or completed the schema change request. - * The path will be of the format /hbase/schema/ - * @param path full path of the node whose children have changed - */ - @Override - public void nodeChildrenChanged(String path) { - String tableName = null; - if (path.startsWith(watcher.schemaZNode) && - !path.equals(watcher.schemaZNode)) { - try { - LOG.debug("NodeChildrenChanged Path = " + path); - tableName = path.substring(path.lastIndexOf("/")+1, path.length()); - processTableNode(tableName); - } catch (KeeperException e) { - TaskMonitor.get().createStatus( - "MasterSchemaChangeTracker: ZK exception while processing " + - " nodeChildrenChanged() event for table = " + tableName - + " Cause = " + e.getCause()); - LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting" - + " schema change nodes", e); - } catch(IOException ioe) { - TaskMonitor.get().createStatus( - "MasterSchemaChangeTracker: ZK exception while processing " + - " nodeChildrenChanged() event for table = " + tableName - + " Cause = " + ioe.getCause()); - LOG.error("MasterSchemaChangeTracker: Unexpected IO exception getting" - + " schema change nodes", ioe); - } - } - } - - /** - * We get notified as and when the RS cloud updates their ZK nodes with - * progress information. The path will be of the format - * /hbase/schema/
/ - * @param path - */ - @Override - public void nodeDataChanged(String path) { - String tableName = null; - if (path.startsWith(watcher.schemaZNode) && - !path.equals(watcher.schemaZNode)) { - try { - LOG.debug("NodeDataChanged Path = " + path); - String[] paths = path.split("/"); - tableName = paths[3]; - processTableNode(tableName); - } catch (KeeperException e) { - TaskMonitor.get().createStatus( - "MasterSchemaChangeTracker: ZK exception while processing " + - " nodeDataChanged() event for table = " + tableName - + " Cause = " + e.getCause()); - LOG.error("MasterSchemaChangeTracker: Unexpected zk exception getting" - + " schema change nodes", e); - } catch(IOException ioe) { - TaskMonitor.get().createStatus( - "MasterSchemaChangeTracker: IO exception while processing " + - " nodeDataChanged() event for table = " + tableName - + " Cause = " + ioe.getCause()); - LOG.error("MasterSchemaChangeTracker: Unexpected IO exception getting" - + " schema change nodes", ioe); - - } - } - } - - public String getSchemaChangeNodePathForTable(String tableName) { - return ZKUtil.joinZNode(watcher.schemaZNode, tableName); - } - - /** - * Used only for tests. Do not use this. See TestInstantSchemaChange for more details - * on how this is getting used. This is primarily used to delay the schema complete - * processing by master so that we can test some complex scenarios such as - * master failover. - * @param sleepTimeMillis - */ - public void setSleepTimeMillis(int sleepTimeMillis) { - this.sleepTimeMillis = sleepTimeMillis; - } - - private String getSchemaChangeNodePathForTableAndServer( - String tableName, String regionServerName) { - return ZKUtil.joinZNode(getSchemaChangeNodePathForTable(tableName), - regionServerName); - } - - - /** - * Holds the current alter state for a table. Alter state includes the - * current alter status (INPROCESS, FAILURE or SUCCESS (success is not getting - * used now.), timestamp of alter request, number of hosts online at the time - * of alter request, number of online regions to process for the schema change - * request, number of processed regions and a list of region servers that - * actually processed the schema change request. - * - * Master keeps track of schema change requests using the alter status and - * periodically updates the alter status based on RS cloud processings. - */ - public static class MasterAlterStatus implements Writable { - - public enum AlterState { - INPROCESS, // Inprocess alter - SUCCESS, // completed alter - FAILURE // failure alter - } - - private AlterState currentAlterStatus; - // TimeStamp - private long stamp; - private int numberOfRegionsToProcess; - private StringBuffer errorCause = new StringBuffer(" "); - private StringBuffer processedHosts = new StringBuffer(" "); - private String hostsToProcess; - private int numberOfRegionsProcessed = 0; - - public MasterAlterStatus() { - - } - - public MasterAlterStatus(int numberOfRegions, String activeHosts) { - this.numberOfRegionsToProcess = numberOfRegions; - this.stamp = System.currentTimeMillis(); - this.currentAlterStatus = AlterState.INPROCESS; - //this.rsToProcess = activeHosts; - this.hostsToProcess = activeHosts; - } - - public AlterState getCurrentAlterStatus() { - return currentAlterStatus; - } - - public void setCurrentAlterStatus(AlterState currentAlterStatus) { - this.currentAlterStatus = currentAlterStatus; - } - - public long getStamp() { - return stamp; - } - - public void setStamp(long stamp) { - this.stamp = stamp; - } - - public int getNumberOfRegionsToProcess() { - return numberOfRegionsToProcess; - } - - public void setNumberOfRegionsToProcess(int numberOfRegionsToProcess) { - this.numberOfRegionsToProcess = numberOfRegionsToProcess; - } - - public int getNumberOfRegionsProcessed() { - return numberOfRegionsProcessed; - } - - public void setNumberOfRegionsProcessed(int numberOfRegionsProcessed) { - this.numberOfRegionsProcessed += numberOfRegionsProcessed; - } - - public String getHostsToProcess() { - return hostsToProcess; - } - - public void setHostsToProcess(String hostsToProcess) { - this.hostsToProcess = hostsToProcess; - } - - public String getErrorCause() { - return errorCause == null ? null : errorCause.toString(); - } - - public void setErrorCause(String errorCause) { - if (errorCause == null || errorCause.trim().length() <= 0) { - return; - } - if (this.errorCause == null) { - this.errorCause = new StringBuffer(errorCause); - } else { - this.errorCause.append(errorCause); - } - } - - public String getProcessedHosts() { - return processedHosts.toString(); - } - - public void setProcessedHosts(String processedHosts) { - if (this.processedHosts == null) { - this.processedHosts = new StringBuffer(processedHosts); - } else { - this.processedHosts.append(" ").append(processedHosts); - } - } - - /** - * Ignore or exempt a RS from schema change processing. - * Master will tweak the number of regions to process based on the - * number of online regions on the target RS and also remove the - * RS from list of hosts to process. - * @param schemaAlterStatus - */ - private void ignoreRSForSchemaChange( - SchemaChangeTracker.SchemaAlterStatus schemaAlterStatus) { - LOG.debug("Removing RS " + schemaAlterStatus.getHostName() - + " from schema change process."); - hostsToProcess = - hostsToProcess.replaceAll(schemaAlterStatus.getHostName(), ""); - int ignoreRegionsCount = schemaAlterStatus.getNumberOfOnlineRegions(); - LOG.debug("Current number of regions processed = " - + this.numberOfRegionsProcessed + " deducting ignored = " - + ignoreRegionsCount - + " final = " + (this.numberOfRegionsToProcess-ignoreRegionsCount)); - if (this.numberOfRegionsToProcess > 0) { - this.numberOfRegionsToProcess -= ignoreRegionsCount; - } else { - LOG.debug("Number of regions to process is less than zero. This is odd"); - } - } - - /** - * Update the master alter status for this table based on RS alter status. - * @param schemaAlterStatus - */ - public void update(SchemaChangeTracker.SchemaAlterStatus schemaAlterStatus) { - this.setProcessedHosts(schemaAlterStatus.getHostName()); - SchemaChangeTracker.SchemaAlterStatus.AlterState rsState = - schemaAlterStatus.getCurrentAlterStatus(); - switch(rsState) { - case FAILURE: - LOG.debug("Schema update failure Status = " - + schemaAlterStatus); - this.setCurrentAlterStatus( - MasterAlterStatus.AlterState.FAILURE); - this.setNumberOfRegionsProcessed( - schemaAlterStatus.getNumberOfRegionsProcessed()); - this.setErrorCause(schemaAlterStatus.getErrorCause()); - break; - case SUCCESS: - LOG.debug("Schema update SUCCESS Status = " - + schemaAlterStatus); - this.setNumberOfRegionsProcessed( - schemaAlterStatus.getNumberOfRegionsProcessed()); - this.setCurrentAlterStatus(MasterAlterStatus.AlterState.SUCCESS); - break; - case IGNORED: - LOG.debug("Schema update IGNORED Updating regions to " + - "process count. Status = "+ schemaAlterStatus); - ignoreRSForSchemaChange(schemaAlterStatus); - break; - default: - break; - } - } - - @Override - public void readFields(DataInput in) throws IOException { - currentAlterStatus = AlterState.valueOf(in.readUTF()); - stamp = in.readLong(); - numberOfRegionsToProcess = in.readInt(); - hostsToProcess = Bytes.toString(Bytes.readByteArray(in)); - processedHosts = new StringBuffer(Bytes.toString(Bytes.readByteArray(in))); - errorCause = new StringBuffer(Bytes.toString(Bytes.readByteArray(in))); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(currentAlterStatus.name()); - out.writeLong(stamp); - out.writeInt(numberOfRegionsToProcess); - Bytes.writeByteArray(out, Bytes.toBytes(hostsToProcess)); - Bytes.writeByteArray(out, Bytes.toBytes(processedHosts.toString())); - Bytes.writeByteArray(out, Bytes.toBytes(errorCause.toString())); - } - - @Override - public String toString() { - return - " state= " + currentAlterStatus - + ", ts= " + stamp - + ", number of regions to process = " + numberOfRegionsToProcess - + ", number of regions processed = " + numberOfRegionsProcessed - + ", hosts = " + hostsToProcess - + " , processed hosts = " + processedHosts - + " , errorCause = " + errorCause; - } - } -} diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java deleted file mode 100644 index 48d4ff774d8..00000000000 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/SchemaChangeTracker.java +++ /dev/null @@ -1,478 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.zookeeper; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.classification.InterfaceAudience; -import org.apache.hadoop.hbase.Abortable; -import org.apache.hadoop.hbase.monitoring.MonitoredTask; -import org.apache.hadoop.hbase.monitoring.TaskMonitor; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.RegionServerServices; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Writables; -import org.apache.zookeeper.KeeperException; -import org.apache.hadoop.hbase.util.Writables; - -import org.apache.hadoop.io.Writable; - -import java.io.*; -import java.util.List; - -/** - * Region server schema change tracker. RS uses this tracker to keep track of - * alter schema requests from master and updates the status once the schema change - * is complete. - */ -@InterfaceAudience.Private -public class SchemaChangeTracker extends ZooKeeperNodeTracker { - public static final Log LOG = LogFactory.getLog(SchemaChangeTracker.class); - private RegionServerServices regionServer = null; - private volatile int sleepTimeMillis = 0; - - - /** - * Constructs a new ZK node tracker. - *

- *

After construction, use {@link #start} to kick off tracking. - * - * @param watcher - * @param node - * @param abortable - */ - public SchemaChangeTracker(ZooKeeperWatcher watcher, - Abortable abortable, - RegionServerServices regionServer) { - super(watcher, watcher.schemaZNode, abortable); - this.regionServer = regionServer; - } - - @Override - public void start() { - try { - watcher.registerListener(this); - ZKUtil.listChildrenAndWatchThem(watcher, node); - // Clean-up old in-process schema changes for this RS now? - } catch (KeeperException e) { - LOG.error("RegionServer SchemaChangeTracker startup failed with " + - "KeeperException.", e); - } - } - - - /** - * This event will be triggered whenever new schema change request is processed by the - * master. The path will be of the format /hbase/schema/

- * @param path full path of the node whose children have changed - */ - @Override - public void nodeChildrenChanged(String path) { - LOG.debug("NodeChildrenChanged. Path = " + path); - if (path.equals(watcher.schemaZNode)) { - try { - List tables = - ZKUtil.listChildrenAndWatchThem(watcher, watcher.schemaZNode); - LOG.debug("RS.SchemaChangeTracker: " + - "Current list of tables with schema change = " + tables); - if (tables != null) { - handleSchemaChange(tables); - } else { - LOG.error("No tables found for schema change event." + - " Skipping instant schema refresh"); - } - } catch (KeeperException ke) { - String errmsg = "KeeperException while handling nodeChildrenChanged for path = " - + path + " Cause = " + ke.getCause(); - LOG.error(errmsg, ke); - TaskMonitor.get().createStatus(errmsg); - } - } - } - - private void handleSchemaChange(List tables) { - for (String tableName : tables) { - if (tableName != null) { - LOG.debug("Processing schema change with status for table = " + tableName); - handleSchemaChange(tableName); - } - } - } - - private void handleSchemaChange(String tableName) { - int refreshedRegionsCount = 0, onlineRegionsCount = 0; - MonitoredTask status = null; - try { - List onlineRegions = - regionServer.getOnlineRegions(Bytes.toBytes(tableName)); - if (onlineRegions != null && !onlineRegions.isEmpty()) { - status = TaskMonitor.get().createStatus("Region server " - + regionServer.getServerName().getServerName() - + " handling schema change for table = " + tableName - + " number of online regions = " + onlineRegions.size()); - onlineRegionsCount = onlineRegions.size(); - createStateNode(tableName, onlineRegions.size()); - for (HRegion hRegion : onlineRegions) { - regionServer.refreshRegion(hRegion); - refreshedRegionsCount ++; - } - SchemaAlterStatus alterStatus = getSchemaAlterStatus(tableName); - alterStatus.update(SchemaAlterStatus.AlterState.SUCCESS, refreshedRegionsCount); - updateSchemaChangeStatus(tableName, alterStatus); - String msg = "Refresh schema completed for table name = " + tableName - + " server = " + regionServer.getServerName().getServerName() - + " online Regions = " + onlineRegions.size() - + " refreshed Regions = " + refreshedRegionsCount; - LOG.debug(msg); - status.setStatus(msg); - } else { - LOG.debug("Server " + regionServer.getServerName().getServerName() - + " has no online regions for table = " + tableName - + " Ignoring the schema change request"); - } - } catch (IOException ioe) { - reportAndLogSchemaRefreshError(tableName, onlineRegionsCount, - refreshedRegionsCount, ioe, status); - } catch (KeeperException ke) { - reportAndLogSchemaRefreshError(tableName, onlineRegionsCount, - refreshedRegionsCount, ke, status); - } - } - - private int getZKNodeVersion(String nodePath) throws KeeperException { - return ZKUtil.checkExists(this.watcher, nodePath); - } - - private void reportAndLogSchemaRefreshError(String tableName, - int onlineRegionsCount, - int refreshedRegionsCount, - Throwable exception, - MonitoredTask status) { - try { - String errmsg = - " Region Server " + regionServer.getServerName().getServerName() - + " failed during schema change process. Cause = " - + exception.getCause() - + " Number of onlineRegions = " + onlineRegionsCount - + " Processed regions = " + refreshedRegionsCount; - SchemaAlterStatus alterStatus = getSchemaAlterStatus(tableName); - alterStatus.update(SchemaAlterStatus.AlterState.FAILURE, - refreshedRegionsCount, errmsg); - String nodePath = getSchemaChangeNodePathForTableAndServer(tableName, - regionServer.getServerName().getServerName()); - ZKUtil.updateExistingNodeData(this.watcher, nodePath, - Writables.getBytes(alterStatus), getZKNodeVersion(nodePath)); - LOG.info("reportAndLogSchemaRefreshError() " + - " Updated child ZKNode with SchemaAlterStatus = " - + alterStatus + " for table = " + tableName); - if (status == null) { - status = TaskMonitor.get().createStatus(errmsg); - } else { - status.setStatus(errmsg); - } - } catch (KeeperException e) { - // Retry ? - String errmsg = "KeeperException while updating the schema change node with " - + "error status for table = " - + tableName + " server = " - + regionServer.getServerName().getServerName() - + " Cause = " + e.getCause(); - LOG.error(errmsg, e); - TaskMonitor.get().createStatus(errmsg); - } catch(IOException ioe) { - // retry ?? - String errmsg = "IOException while updating the schema change node with " - + "server name for table = " - + tableName + " server = " - + regionServer.getServerName().getServerName() - + " Cause = " + ioe.getCause(); - TaskMonitor.get().createStatus(errmsg); - LOG.error(errmsg, ioe); - } - } - - - private void createStateNode(String tableName, int numberOfOnlineRegions) - throws IOException { - SchemaAlterStatus sas = - new SchemaAlterStatus(regionServer.getServerName().getServerName(), - numberOfOnlineRegions); - LOG.debug("Creating Schema Alter State node = " + sas); - try { - ZKUtil.createSetData(this.watcher, - getSchemaChangeNodePathForTableAndServer(tableName, - regionServer.getServerName().getServerName()), - Writables.getBytes(sas)); - } catch (KeeperException ke) { - String errmsg = "KeeperException while creating the schema change node with " - + "server name for table = " - + tableName + " server = " - + regionServer.getServerName().getServerName() - + " Message = " + ke.getCause(); - LOG.error(errmsg, ke); - TaskMonitor.get().createStatus(errmsg); - } - - } - - private SchemaAlterStatus getSchemaAlterStatus(String tableName) - throws KeeperException, IOException { - byte[] statusBytes = ZKUtil.getData(this.watcher, - getSchemaChangeNodePathForTableAndServer(tableName, - regionServer.getServerName().getServerName())); - if (statusBytes == null || statusBytes.length <= 0) { - return null; - } - SchemaAlterStatus sas = new SchemaAlterStatus(); - Writables.getWritable(statusBytes, sas); - return sas; - } - - private void updateSchemaChangeStatus(String tableName, - SchemaAlterStatus schemaAlterStatus) - throws KeeperException, IOException { - try { - if(sleepTimeMillis > 0) { - try { - LOG.debug("SchemaChangeTracker sleeping for " - + sleepTimeMillis); - Thread.sleep(sleepTimeMillis); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - ZKUtil.updateExistingNodeData(this.watcher, - getSchemaChangeNodePathForTableAndServer(tableName, - regionServer.getServerName().getServerName()), - Writables.getBytes(schemaAlterStatus), -1); - String msg = "Schema change tracker completed for table = " + tableName - + " status = " + schemaAlterStatus; - LOG.debug(msg); - TaskMonitor.get().createStatus(msg); - } catch (KeeperException.NoNodeException e) { - String errmsg = "KeeperException.NoNodeException while updating the schema " - + "change node with server name for table = " - + tableName + " server = " - + regionServer.getServerName().getServerName() - + " Cause = " + e.getCause(); - TaskMonitor.get().createStatus(errmsg); - LOG.error(errmsg, e); - } catch (KeeperException e) { - // Retry ? - String errmsg = "KeeperException while updating the schema change node with " - + "server name for table = " - + tableName + " server = " - + regionServer.getServerName().getServerName() - + " Cause = " + e.getCause(); - LOG.error(errmsg, e); - TaskMonitor.get().createStatus(errmsg); - } catch(IOException ioe) { - String errmsg = "IOException while updating the schema change node with " - + "server name for table = " - + tableName + " server = " - + regionServer.getServerName().getServerName() - + " Cause = " + ioe.getCause(); - LOG.error(errmsg, ioe); - TaskMonitor.get().createStatus(errmsg); - } - } - - private String getSchemaChangeNodePathForTable(String tableName) { - return ZKUtil.joinZNode(watcher.schemaZNode, tableName); - } - - private String getSchemaChangeNodePathForTableAndServer( - String tableName, String regionServerName) { - return ZKUtil.joinZNode(getSchemaChangeNodePathForTable(tableName), - regionServerName); - } - - public int getSleepTimeMillis() { - return sleepTimeMillis; - } - - /** - * Set a sleep time in millis before this RS can update it's progress status. - * Used only for test cases to test complex test scenarios such as RS failures and - * RS exemption handling. - * @param sleepTimeMillis - */ - public void setSleepTimeMillis(int sleepTimeMillis) { - this.sleepTimeMillis = sleepTimeMillis; - } - - /** - * Check whether there are any schema change requests that are in progress now for the given table. - * We simply assume that a schema change is in progress if we see a ZK schema node this - * any table. We may revisit for fine grained checks such as check the current alter status - * et al, but it is not required now. - * @return - */ - public boolean isSchemaChangeInProgress(String tableName) { - try { - List schemaChanges = ZKUtil.listChildrenAndWatchThem(this.watcher, - watcher.schemaZNode); - if (schemaChanges != null) { - for (String alterTableName : schemaChanges) { - if (alterTableName.equals(tableName)) { - return true; - } - } - return false; - } - } catch (KeeperException ke) { - LOG.debug("isSchemaChangeInProgress. " + - "KeeperException while getting current schema change progress."); - return false; - } - return false; - } - - /** - * Holds the current alter state for a table. Alter state includes the - * current alter status (INPROCESS, FAILURE, SUCCESS, or IGNORED, current RS - * host name, timestamp of alter request, number of online regions this RS has for - * the given table, number of processed regions and an errorCause in case - * if the RS failed during the schema change process. - * - * RS keeps track of schema change requests per table using the alter status and - * periodically updates the alter status based on schema change status. - */ - public static class SchemaAlterStatus implements Writable { - - public enum AlterState { - INPROCESS, // Inprocess alter - SUCCESS, // completed alter - FAILURE, // failure alter - IGNORED // Ignore the alter processing. - } - - private AlterState currentAlterStatus; - // TimeStamp - private long stamp; - private int numberOfOnlineRegions; - private String errorCause = " "; - private String hostName; - private int numberOfRegionsProcessed = 0; - - public SchemaAlterStatus() { - - } - - public SchemaAlterStatus(String hostName, int numberOfOnlineRegions) { - this.numberOfOnlineRegions = numberOfOnlineRegions; - this.stamp = System.currentTimeMillis(); - this.currentAlterStatus = AlterState.INPROCESS; - //this.rsToProcess = activeHosts; - this.hostName = hostName; - } - - public AlterState getCurrentAlterStatus() { - return currentAlterStatus; - } - - public void setCurrentAlterStatus(AlterState currentAlterStatus) { - this.currentAlterStatus = currentAlterStatus; - } - - public int getNumberOfOnlineRegions() { - return numberOfOnlineRegions; - } - - public void setNumberOfOnlineRegions(int numberOfRegions) { - this.numberOfOnlineRegions = numberOfRegions; - } - - public int getNumberOfRegionsProcessed() { - return numberOfRegionsProcessed; - } - - public void setNumberOfRegionsProcessed(int numberOfRegionsProcessed) { - this.numberOfRegionsProcessed = numberOfRegionsProcessed; - } - - public String getErrorCause() { - return errorCause; - } - - public void setErrorCause(String errorCause) { - this.errorCause = errorCause; - } - - public String getHostName() { - return hostName; - } - - public void setHostName(String hostName) { - this.hostName = hostName; - } - - public void update(AlterState state, int numberOfRegions, String errorCause) { - this.currentAlterStatus = state; - this.numberOfRegionsProcessed = numberOfRegions; - this.errorCause = errorCause; - } - - public void update(AlterState state, int numberOfRegions) { - this.currentAlterStatus = state; - this.numberOfRegionsProcessed = numberOfRegions; - } - - public void update(AlterState state) { - this.currentAlterStatus = state; - } - - public void update(SchemaAlterStatus status) { - this.currentAlterStatus = status.getCurrentAlterStatus(); - this.numberOfRegionsProcessed = status.getNumberOfRegionsProcessed(); - this.errorCause = status.getErrorCause(); - } - - @Override - public void readFields(DataInput in) throws IOException { - currentAlterStatus = AlterState.valueOf(in.readUTF()); - stamp = in.readLong(); - numberOfOnlineRegions = in.readInt(); - hostName = Bytes.toString(Bytes.readByteArray(in)); - numberOfRegionsProcessed = in.readInt(); - errorCause = Bytes.toString(Bytes.readByteArray(in)); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(currentAlterStatus.name()); - out.writeLong(stamp); - out.writeInt(numberOfOnlineRegions); - Bytes.writeByteArray(out, Bytes.toBytes(hostName)); - out.writeInt(numberOfRegionsProcessed); - Bytes.writeByteArray(out, Bytes.toBytes(errorCause)); - } - - @Override - public String toString() { - return - " state= " + currentAlterStatus - + ", ts= " + stamp - + ", number of online regions = " + numberOfOnlineRegions - + ", host= " + hostName + " processed regions = " + numberOfRegionsProcessed - + ", errorCause = " + errorCause; - } - } - -} diff --git a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java index 0f83655744e..cc656236965 100644 --- a/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java +++ b/src/main/java/org/apache/hadoop/hbase/zookeeper/ZooKeeperWatcher.java @@ -102,8 +102,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { public String clusterIdZNode; // znode used for log splitting work assignment public String splitLogZNode; - // znode used to record table schema changes - public String schemaZNode; // Certain ZooKeeper nodes need to be world-readable public static final ArrayList CREATOR_ALL_AND_WORLD_READABLE = @@ -166,7 +164,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { ZKUtil.createAndFailSilent(this, drainingZNode); ZKUtil.createAndFailSilent(this, tableZNode); ZKUtil.createAndFailSilent(this, splitLogZNode); - ZKUtil.createAndFailSilent(this, schemaZNode); ZKUtil.createAndFailSilent(this, backupMasterAddressesZNode); } catch (KeeperException e) { throw new ZooKeeperConnectionException( @@ -215,8 +212,6 @@ public class ZooKeeperWatcher implements Watcher, Abortable, Closeable { conf.get("zookeeper.znode.clusterId", "hbaseid")); splitLogZNode = ZKUtil.joinZNode(baseZNode, conf.get("zookeeper.znode.splitlog", HConstants.SPLIT_LOGDIR_NAME)); - schemaZNode = ZKUtil.joinZNode(baseZNode, - conf.get("zookeeper.znode.schema", "schema")); } /** diff --git a/src/main/resources/hbase-default.xml b/src/main/resources/hbase-default.xml index 341431ace55..44ee6895ad7 100644 --- a/src/main/resources/hbase-default.xml +++ b/src/main/resources/hbase-default.xml @@ -780,39 +780,15 @@ - hbase.coprocessor.abortonerror - false - - Set to true to cause the hosting server (master or regionserver) to - abort if a coprocessor throws a Throwable object that is not IOException or - a subclass of IOException. Setting it to true might be useful in development - environments where one wants to terminate the server as soon as possible to - simplify coprocessor failure analysis. - - - - hbase.instant.schema.alter.enabled - false - Whether or not to handle alter schema changes instantly or not. - If enabled, all schema change alter operations will be instant, as the master will not - explicitly unassign/assign the impacted regions and instead will rely on Region servers to - refresh their schema changes. If enabled, the schema alter requests will survive - master or RS failures. - - - - hbase.instant.schema.janitor.period - 120000 - The Schema Janitor process wakes up every millis and sweeps all - expired/failed schema change requests. - - - - hbase.instant.schema.alter.timeout - 60000 - Timeout in millis after which any pending schema alter request will be - considered as failed. - + hbase.coprocessor.abortonerror + false + + Set to true to cause the hosting server (master or regionserver) to + abort if a coprocessor throws a Throwable object that is not IOException or + a subclass of IOException. Setting it to true might be useful in development + environments where one wants to terminate the server as soon as possible to + simplify coprocessor failure analysis. + hbase.online.schema.update.enable diff --git a/src/test/java/org/apache/hadoop/hbase/client/InstantSchemaChangeTestBase.java b/src/test/java/org/apache/hadoop/hbase/client/InstantSchemaChangeTestBase.java index 378c2b47dab..e69de29bb2d 100644 --- a/src/test/java/org/apache/hadoop/hbase/client/InstantSchemaChangeTestBase.java +++ b/src/test/java/org/apache/hadoop/hbase/client/InstantSchemaChangeTestBase.java @@ -1,169 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.assertEquals; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.LargeTests; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.JVMClusterUtil; -import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; -import org.apache.zookeeper.KeeperException; -import org.junit.After; -import org.junit.Before; -import org.junit.experimental.categories.Category; - -@Category(LargeTests.class) -public class InstantSchemaChangeTestBase { - - final Log LOG = LogFactory.getLog(getClass()); - protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - protected HBaseAdmin admin; - protected static MiniHBaseCluster miniHBaseCluster = null; - protected Configuration conf; - protected static MasterSchemaChangeTracker msct = null; - - protected final byte [] row = Bytes.toBytes("row"); - protected final byte [] qualifier = Bytes.toBytes("qualifier"); - final byte [] value = Bytes.toBytes("value"); - - @Before - public void setUpBeforeClass() throws Exception { - TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); - TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); - TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); - TEST_UTIL.getConfiguration().setBoolean("hbase.instant.schema.alter.enabled", true); - TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true); - TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.janitor.period", 10000); - TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.alter.timeout", 30000); - // - miniHBaseCluster = TEST_UTIL.startMiniCluster(2,5); - msct = TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); - this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); - - } - - @After - public void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - /** - * Find the RS that is currently holding our online region. - * @param tableName - * @return - */ - protected HRegionServer findRSWithOnlineRegionFor(String tableName) { - List rsThreads = - miniHBaseCluster.getLiveRegionServerThreads(); - for (JVMClusterUtil.RegionServerThread rsT : rsThreads) { - HRegionServer rs = rsT.getRegionServer(); - List regions = rs.getOnlineRegions(Bytes.toBytes(tableName)); - if (regions != null && !regions.isEmpty()) { - return rs; - } - } - return null; - } - - protected void waitForSchemaChangeProcess(final String tableName) - throws KeeperException, InterruptedException { - waitForSchemaChangeProcess(tableName, 10000); - } - - /** - * This a pretty low cost signalling mechanism. It is quite possible that we will - * miss out the ZK node creation signal as in some cases the schema change process - * happens rather quickly and our thread waiting for ZK node creation might wait forver. - * The fool-proof strategy would be to directly listen for ZK events. - * @param tableName - * @throws KeeperException - * @throws InterruptedException - */ - protected void waitForSchemaChangeProcess(final String tableName, final long waitTimeMills) - throws KeeperException, InterruptedException { - LOG.info("Waiting for ZK node creation for table = " + tableName); - final MasterSchemaChangeTracker msct = - TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); - final Runnable r = new Runnable() { - public void run() { - try { - while(!msct.doesSchemaChangeNodeExists(tableName)) { - try { - Thread.sleep(50); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } catch (KeeperException ke) { - ke.printStackTrace(); - } - LOG.info("Waiting for ZK node deletion for table = " + tableName); - try { - while(msct.doesSchemaChangeNodeExists(tableName)) { - try { - Thread.sleep(50); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } catch (KeeperException ke) { - ke.printStackTrace(); - } - } - }; - Thread t = new Thread(r); - t.start(); - if (waitTimeMills > 0) { - t.join(waitTimeMills); - } else { - t.join(10000); - } - } - - protected HTable createTableAndValidate(String tableName) throws IOException { - conf = TEST_UTIL.getConfiguration(); - LOG.info("Start createTableAndValidate()"); - TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); - HTableDescriptor[] tables = admin.listTables(); - int numTables = 0; - if (tables != null) { - numTables = tables.length; - } - HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName), - HConstants.CATALOG_FAMILY); - tables = this.admin.listTables(); - assertEquals(numTables + 1, tables.length); - LOG.info("created table = " + tableName); - return ht; - } - -} \ No newline at end of file diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java b/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java deleted file mode 100644 index 4ac28474239..00000000000 --- a/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChange.java +++ /dev/null @@ -1,473 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.LargeTests; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; -import org.apache.zookeeper.KeeperException; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category(LargeTests.class) -public class TestInstantSchemaChange extends InstantSchemaChangeTestBase { - - final Log LOG = LogFactory.getLog(getClass()); - - @Test - public void testInstantSchemaChangeForModifyTable() throws IOException, - KeeperException, InterruptedException { - - String tableName = "testInstantSchemaChangeForModifyTable"; - conf = TEST_UTIL.getConfiguration(); - LOG.info("Start testInstantSchemaChangeForModifyTable()"); - HTable ht = createTableAndValidate(tableName); - - String newFamily = "newFamily"; - HTableDescriptor htd = new HTableDescriptor(tableName); - htd.addFamily(new HColumnDescriptor(newFamily)); - - admin.modifyTable(Bytes.toBytes(tableName), htd); - waitForSchemaChangeProcess(tableName); - assertFalse(msct.doesSchemaChangeNodeExists(tableName)); - - Put put1 = new Put(row); - put1.add(Bytes.toBytes(newFamily), qualifier, value); - ht.put(put1); - - Get get1 = new Get(row); - get1.addColumn(Bytes.toBytes(newFamily), qualifier); - Result r = ht.get(get1); - byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier); - int result = Bytes.compareTo(value, tvalue); - assertEquals(result, 0); - LOG.info("END testInstantSchemaChangeForModifyTable()"); - ht.close(); - } - - @Test - public void testInstantSchemaChangeForAddColumn() throws IOException, - KeeperException, InterruptedException { - LOG.info("Start testInstantSchemaChangeForAddColumn() "); - String tableName = "testSchemachangeForAddColumn"; - HTable ht = createTableAndValidate(tableName); - String newFamily = "newFamily"; - HColumnDescriptor hcd = new HColumnDescriptor("newFamily"); - - admin.addColumn(Bytes.toBytes(tableName), hcd); - waitForSchemaChangeProcess(tableName); - assertFalse(msct.doesSchemaChangeNodeExists(tableName)); - - Put put1 = new Put(row); - put1.add(Bytes.toBytes(newFamily), qualifier, value); - LOG.info("******** Put into new column family "); - ht.put(put1); - - Get get1 = new Get(row); - get1.addColumn(Bytes.toBytes(newFamily), qualifier); - Result r = ht.get(get1); - byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier); - LOG.info(" Value put = " + value + " value from table = " + tvalue); - int result = Bytes.compareTo(value, tvalue); - assertEquals(result, 0); - LOG.info("End testInstantSchemaChangeForAddColumn() "); - ht.close(); - } - - @Test - public void testInstantSchemaChangeForModifyColumn() throws IOException, - KeeperException, InterruptedException { - LOG.info("Start testInstantSchemaChangeForModifyColumn() "); - String tableName = "testSchemachangeForModifyColumn"; - createTableAndValidate(tableName); - - HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY); - hcd.setMaxVersions(99); - hcd.setBlockCacheEnabled(false); - - admin.modifyColumn(Bytes.toBytes(tableName), hcd); - waitForSchemaChangeProcess(tableName); - assertFalse(msct.doesSchemaChangeNodeExists(tableName)); - - List onlineRegions - = miniHBaseCluster.getRegions(Bytes.toBytes("testSchemachangeForModifyColumn")); - for (HRegion onlineRegion : onlineRegions) { - HTableDescriptor htd = onlineRegion.getTableDesc(); - HColumnDescriptor tableHcd = htd.getFamily(HConstants.CATALOG_FAMILY); - assertTrue(tableHcd.isBlockCacheEnabled() == false); - assertEquals(tableHcd.getMaxVersions(), 99); - } - LOG.info("End testInstantSchemaChangeForModifyColumn() "); - - } - - @Test - public void testInstantSchemaChangeForDeleteColumn() throws IOException, - KeeperException, InterruptedException { - LOG.info("Start testInstantSchemaChangeForDeleteColumn() "); - String tableName = "testSchemachangeForDeleteColumn"; - int numTables = 0; - HTableDescriptor[] tables = admin.listTables(); - if (tables != null) { - numTables = tables.length; - } - - byte[][] FAMILIES = new byte[][] { - Bytes.toBytes("A"), Bytes.toBytes("B"), Bytes.toBytes("C") }; - - HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName), - FAMILIES); - tables = this.admin.listTables(); - assertEquals(numTables + 1, tables.length); - LOG.info("Table testSchemachangeForDeleteColumn created"); - - admin.deleteColumn(tableName, "C"); - - waitForSchemaChangeProcess(tableName); - assertFalse(msct.doesSchemaChangeNodeExists(tableName)); - HTableDescriptor modifiedHtd = this.admin.getTableDescriptor(Bytes.toBytes(tableName)); - HColumnDescriptor hcd = modifiedHtd.getFamily(Bytes.toBytes("C")); - assertTrue(hcd == null); - LOG.info("End testInstantSchemaChangeForDeleteColumn() "); - ht.close(); - } - - @Test - public void testInstantSchemaChangeWhenTableIsNotEnabled() throws IOException, - KeeperException { - final String tableName = "testInstantSchemaChangeWhenTableIsDisabled"; - conf = TEST_UTIL.getConfiguration(); - LOG.info("Start testInstantSchemaChangeWhenTableIsDisabled()"); - HTable ht = createTableAndValidate(tableName); - // Disable table - admin.disableTable("testInstantSchemaChangeWhenTableIsDisabled"); - // perform schema changes - HColumnDescriptor hcd = new HColumnDescriptor("newFamily"); - admin.addColumn(Bytes.toBytes(tableName), hcd); - MasterSchemaChangeTracker msct = - TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); - assertTrue(msct.doesSchemaChangeNodeExists(tableName) == false); - ht.close(); - } - - /** - * Test that when concurrent alter requests are received for a table we don't miss any. - * @throws IOException - * @throws KeeperException - * @throws InterruptedException - */ - @Test - public void testConcurrentInstantSchemaChangeForAddColumn() throws IOException, - KeeperException, InterruptedException { - final String tableName = "testConcurrentInstantSchemaChangeForModifyTable"; - conf = TEST_UTIL.getConfiguration(); - LOG.info("Start testConcurrentInstantSchemaChangeForModifyTable()"); - HTable ht = createTableAndValidate(tableName); - - Runnable run1 = new Runnable() { - public void run() { - HColumnDescriptor hcd = new HColumnDescriptor("family1"); - try { - admin.addColumn(Bytes.toBytes(tableName), hcd); - } catch (IOException ioe) { - ioe.printStackTrace(); - - } - } - }; - Runnable run2 = new Runnable() { - public void run() { - HColumnDescriptor hcd = new HColumnDescriptor("family2"); - try { - admin.addColumn(Bytes.toBytes(tableName), hcd); - } catch (IOException ioe) { - ioe.printStackTrace(); - - } - } - }; - - run1.run(); - // We have to add a sleep here as in concurrent scenarios the HTD update - // in HDFS fails and returns with null HTD. This needs to be investigated, - // but it doesn't impact the instant alter functionality in any way. - Thread.sleep(100); - run2.run(); - - waitForSchemaChangeProcess(tableName); - - Put put1 = new Put(row); - put1.add(Bytes.toBytes("family1"), qualifier, value); - ht.put(put1); - - Get get1 = new Get(row); - get1.addColumn(Bytes.toBytes("family1"), qualifier); - Result r = ht.get(get1); - byte[] tvalue = r.getValue(Bytes.toBytes("family1"), qualifier); - int result = Bytes.compareTo(value, tvalue); - assertEquals(result, 0); - Thread.sleep(10000); - - Put put2 = new Put(row); - put2.add(Bytes.toBytes("family2"), qualifier, value); - ht.put(put2); - - Get get2 = new Get(row); - get2.addColumn(Bytes.toBytes("family2"), qualifier); - Result r2 = ht.get(get2); - byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier); - int result2 = Bytes.compareTo(value, tvalue2); - assertEquals(result2, 0); - LOG.info("END testConcurrentInstantSchemaChangeForModifyTable()"); - ht.close(); - } - - /** - * The schema change request blocks while a LB run is in progress. This - * test validates this behavior. - * @throws IOException - * @throws InterruptedException - * @throws KeeperException - */ - @Test - public void testConcurrentInstantSchemaChangeAndLoadBalancerRun() throws IOException, - InterruptedException, KeeperException { - final String tableName = "testInstantSchemaChangeWithLoadBalancerRunning"; - conf = TEST_UTIL.getConfiguration(); - LOG.info("Start testInstantSchemaChangeWithLoadBalancerRunning()"); - final String newFamily = "newFamily"; - HTable ht = createTableAndValidate(tableName); - final MasterSchemaChangeTracker msct = - TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); - - - Runnable balancer = new Runnable() { - public void run() { - // run the balancer now. - miniHBaseCluster.getMaster().balance(); - } - }; - - Runnable schemaChanger = new Runnable() { - public void run() { - HColumnDescriptor hcd = new HColumnDescriptor(newFamily); - try { - admin.addColumn(Bytes.toBytes(tableName), hcd); - } catch (IOException ioe) { - ioe.printStackTrace(); - - } - } - }; - - balancer.run(); - schemaChanger.run(); - waitForSchemaChangeProcess(tableName, 40000); - assertFalse(msct.doesSchemaChangeNodeExists(tableName)); - - Put put1 = new Put(row); - put1.add(Bytes.toBytes(newFamily), qualifier, value); - LOG.info("******** Put into new column family "); - ht.put(put1); - ht.flushCommits(); - - LOG.info("******** Get from new column family "); - Get get1 = new Get(row); - get1.addColumn(Bytes.toBytes(newFamily), qualifier); - Result r = ht.get(get1); - byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier); - LOG.info(" Value put = " + value + " value from table = " + tvalue); - int result = Bytes.compareTo(value, tvalue); - assertEquals(result, 0); - - LOG.info("End testInstantSchemaChangeWithLoadBalancerRunning() "); - ht.close(); - } - - - /** - * This test validates two things. One is that the LoadBalancer does not run when a schema - * change process is in progress. The second thing is that it also checks that failed/expired - * schema changes are expired to unblock the load balancer run. - * - */ - @Test (timeout=70000) - public void testLoadBalancerBlocksDuringSchemaChangeRequests() throws KeeperException, - IOException, InterruptedException { - LOG.info("Start testConcurrentLoadBalancerSchemaChangeRequests() "); - final MasterSchemaChangeTracker msct = - TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); - // Test that the load balancer does not run while an in-flight schema - // change operation is in progress. - // Simulate a new schema change request. - msct.createSchemaChangeNode("testLoadBalancerBlocks", 0); - // The schema change node is created. - assertTrue(msct.doesSchemaChangeNodeExists("testLoadBalancerBlocks")); - // Now, request an explicit LB run. - - Runnable balancer1 = new Runnable() { - public void run() { - // run the balancer now. - miniHBaseCluster.getMaster().balance(); - } - }; - balancer1.run(); - - // Load balancer should not run now. - assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false); - LOG.debug("testConcurrentLoadBalancerSchemaChangeRequests Asserted"); - LOG.info("End testConcurrentLoadBalancerSchemaChangeRequests() "); - } - - /** - * Test that instant schema change blocks while LB is running. - * @throws KeeperException - * @throws IOException - * @throws InterruptedException - */ - @Test (timeout=10000) - public void testInstantSchemaChangeBlocksDuringLoadBalancerRun() throws KeeperException, - IOException, InterruptedException { - final MasterSchemaChangeTracker msct = - TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); - - final String tableName = "testInstantSchemaChangeBlocksDuringLoadBalancerRun"; - conf = TEST_UTIL.getConfiguration(); - LOG.info("Start testInstantSchemaChangeBlocksDuringLoadBalancerRun()"); - final String newFamily = "newFamily"; - createTableAndValidate(tableName); - - // Test that the schema change request does not run while an in-flight LB run - // is in progress. - // First, request an explicit LB run. - - Runnable balancer1 = new Runnable() { - public void run() { - // run the balancer now. - miniHBaseCluster.getMaster().balance(); - } - }; - - Runnable schemaChanger = new Runnable() { - public void run() { - HColumnDescriptor hcd = new HColumnDescriptor(newFamily); - try { - admin.addColumn(Bytes.toBytes(tableName), hcd); - } catch (IOException ioe) { - ioe.printStackTrace(); - - } - } - }; - - Thread t1 = new Thread(balancer1); - Thread t2 = new Thread(schemaChanger); - t1.start(); - t2.start(); - - // check that they both happen concurrently - Runnable balancerCheck = new Runnable() { - public void run() { - // check whether balancer is running. - while(!miniHBaseCluster.getMaster().isLoadBalancerRunning()) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - try { - assertFalse(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks")); - } catch (KeeperException ke) { - ke.printStackTrace(); - } - LOG.debug("Load Balancer is now running or skipped"); - while(miniHBaseCluster.getMaster().isLoadBalancerRunning()) { - try { - Thread.sleep(10); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false); - try { - assertTrue(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks")); - } catch (KeeperException ke) { - - } - - } - }; - - Thread t = new Thread(balancerCheck); - t.start(); - t.join(1000); - // Load balancer should not run now. - //assertTrue(miniHBaseCluster.getMaster().isLoadBalancerRunning() == false); - // Schema change request node should now exist. - // assertTrue(msct.doesSchemaChangeNodeExists("testSchemaChangeBlocks")); - LOG.debug("testInstantSchemaChangeBlocksDuringLoadBalancerRun Asserted"); - LOG.info("End testInstantSchemaChangeBlocksDuringLoadBalancerRun() "); - } - - /** - * To test the schema janitor (that it cleans expired/failed schema alter attempts) we - * simply create a fake table (that doesn't exist, with fake number of online regions) in ZK. - * This schema alter request will time out (after 30 seconds) and our janitor will clean it up. - * regions - * @throws IOException - * @throws KeeperException - * @throws InterruptedException - */ - @Test - public void testInstantSchemaJanitor() throws IOException, - KeeperException, InterruptedException { - LOG.info("testInstantSchemaWithFailedExpiredOperations() "); - String fakeTableName = "testInstantSchemaWithFailedExpiredOperations"; - MasterSchemaChangeTracker msct = - TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); - msct.createSchemaChangeNode(fakeTableName, 10); - LOG.debug(msct.getSchemaChangeNodePathForTable(fakeTableName) - + " created"); - Thread.sleep(40000); - assertFalse(msct.doesSchemaChangeNodeExists(fakeTableName)); - LOG.debug(msct.getSchemaChangeNodePathForTable(fakeTableName) - + " deleted"); - LOG.info("END testInstantSchemaWithFailedExpiredOperations() "); - } - - - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeFailover.java b/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeFailover.java deleted file mode 100644 index c1490eb79ac..00000000000 --- a/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeFailover.java +++ /dev/null @@ -1,313 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseTestingUtility; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.LargeTests; -import org.apache.hadoop.hbase.MiniHBaseCluster; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; -import org.apache.hadoop.hbase.zookeeper.ZKUtil; -import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; -import org.apache.zookeeper.KeeperException; -import org.junit.After; -import org.junit.Before; -import org.junit.Ignore; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category(LargeTests.class) -public class TestInstantSchemaChangeFailover { - - final Log LOG = LogFactory.getLog(getClass()); - private final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private HBaseAdmin admin; - private static MiniHBaseCluster miniHBaseCluster = null; - private Configuration conf; - private ZooKeeperWatcher zkw; - private static MasterSchemaChangeTracker msct = null; - - private final byte [] row = Bytes.toBytes("row"); - private final byte [] qualifier = Bytes.toBytes("qualifier"); - final byte [] value = Bytes.toBytes("value"); - - @Before - public void setUpBeforeClass() throws Exception { - TEST_UTIL.getConfiguration().setInt("hbase.regionserver.msginterval", 100); - TEST_UTIL.getConfiguration().setInt("hbase.client.pause", 250); - TEST_UTIL.getConfiguration().setInt("hbase.client.retries.number", 6); - TEST_UTIL.getConfiguration().setBoolean("hbase.online.schema.update.enable", true); - TEST_UTIL.getConfiguration().setBoolean("hbase.instant.schema.alter.enabled", true); - TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.janitor.period", 10000); - TEST_UTIL.getConfiguration().setInt("hbase.instant.schema.alter.timeout", 30000); - // - miniHBaseCluster = TEST_UTIL.startMiniCluster(2,5); - msct = TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); - this.admin = new HBaseAdmin(TEST_UTIL.getConfiguration()); - } - - @After - public void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - /** - * This a pretty low cost signalling mechanism. It is quite possible that we will - * miss out the ZK node creation signal as in some cases the schema change process - * happens rather quickly and our thread waiting for ZK node creation might wait forver. - * The fool-proof strategy would be to directly listen for ZK events. - * @param tableName - * @throws KeeperException - * @throws InterruptedException - */ - private void waitForSchemaChangeProcess(final String tableName) - throws KeeperException, InterruptedException { - LOG.info("Waiting for ZK node creation for table = " + tableName); - final MasterSchemaChangeTracker msct = - TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); - final Runnable r = new Runnable() { - public void run() { - try { - while(!msct.doesSchemaChangeNodeExists(tableName)) { - try { - Thread.sleep(20); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } catch (KeeperException ke) { - ke.printStackTrace(); - } - - LOG.info("Waiting for ZK node deletion for table = " + tableName); - try { - while(msct.doesSchemaChangeNodeExists(tableName)) { - try { - Thread.sleep(20); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } catch (KeeperException ke) { - ke.printStackTrace(); - } - } - }; - Thread t = new Thread(r); - t.start(); - t.join(10000); - } - - - /** - * Kill a random RS and see that the schema change can succeed. - * @throws IOException - * @throws KeeperException - * @throws InterruptedException - */ - @Test (timeout=50000) - public void testInstantSchemaChangeWhileRSCrash() throws IOException, - KeeperException, InterruptedException { - LOG.info("Start testInstantSchemaChangeWhileRSCrash()"); - zkw = miniHBaseCluster.getMaster().getZooKeeperWatcher(); - - final String tableName = "TestRSCrashDuringSchemaChange"; - HTable ht = createTableAndValidate(tableName); - HColumnDescriptor hcd = new HColumnDescriptor("family2"); - admin.addColumn(Bytes.toBytes(tableName), hcd); - - miniHBaseCluster.getRegionServer(0).abort("Killing while instant schema change"); - // Let the dust settle down - Thread.sleep(10000); - waitForSchemaChangeProcess(tableName); - Put put2 = new Put(row); - put2.add(Bytes.toBytes("family2"), qualifier, value); - ht.put(put2); - - Get get2 = new Get(row); - get2.addColumn(Bytes.toBytes("family2"), qualifier); - Result r2 = ht.get(get2); - byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier); - int result2 = Bytes.compareTo(value, tvalue2); - assertEquals(result2, 0); - String nodePath = msct.getSchemaChangeNodePathForTable("TestRSCrashDuringSchemaChange"); - assertTrue(ZKUtil.checkExists(zkw, nodePath) == -1); - LOG.info("result2 = " + result2); - LOG.info("end testInstantSchemaChangeWhileRSCrash()"); - ht.close(); - } - - /** - * Randomly bring down/up RS servers while schema change is in progress. This test - * is same as the above one but the only difference is that we intent to kill and start - * new RS instances while a schema change is in progress. - * @throws IOException - * @throws KeeperException - * @throws InterruptedException - */ - @Test (timeout=70000) - public void testInstantSchemaChangeWhileRandomRSCrashAndStart() throws IOException, - KeeperException, InterruptedException { - LOG.info("Start testInstantSchemaChangeWhileRandomRSCrashAndStart()"); - miniHBaseCluster.getRegionServer(4).abort("Killing RS 4"); - // Start a new RS before schema change . - // Commenting the start RS as it is failing with DFS user permission NPE. - //miniHBaseCluster.startRegionServer(); - - // Let the dust settle - Thread.sleep(10000); - final String tableName = "testInstantSchemaChangeWhileRandomRSCrashAndStart"; - HTable ht = createTableAndValidate(tableName); - HColumnDescriptor hcd = new HColumnDescriptor("family2"); - admin.addColumn(Bytes.toBytes(tableName), hcd); - // Kill 2 RS now. - miniHBaseCluster.getRegionServer(2).abort("Killing RS 2"); - // Let the dust settle - Thread.sleep(10000); - // We will be left with only one RS. - waitForSchemaChangeProcess(tableName); - assertFalse(msct.doesSchemaChangeNodeExists(tableName)); - Put put2 = new Put(row); - put2.add(Bytes.toBytes("family2"), qualifier, value); - ht.put(put2); - - Get get2 = new Get(row); - get2.addColumn(Bytes.toBytes("family2"), qualifier); - Result r2 = ht.get(get2); - byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier); - int result2 = Bytes.compareTo(value, tvalue2); - assertEquals(result2, 0); - LOG.info("result2 = " + result2); - LOG.info("end testInstantSchemaChangeWhileRandomRSCrashAndStart()"); - ht.close(); - } - - /** - * Test scenario where primary master is brought down while processing an - * alter request. This is harder one as it is very difficult the time this. - * @throws IOException - * @throws KeeperException - * @throws InterruptedException - */ - - @Test (timeout=50000) - public void testInstantSchemaChangeWhileMasterFailover() throws IOException, - KeeperException, InterruptedException { - LOG.info("Start testInstantSchemaChangeWhileMasterFailover()"); - //Thread.sleep(5000); - - final String tableName = "testInstantSchemaChangeWhileMasterFailover"; - HTable ht = createTableAndValidate(tableName); - HColumnDescriptor hcd = new HColumnDescriptor("family2"); - admin.addColumn(Bytes.toBytes(tableName), hcd); - // Kill primary master now. - Thread.sleep(50); - miniHBaseCluster.getMaster().abort("Aborting master now", new Exception("Schema exception")); - - // It may not be possible for us to check the schema change status - // using waitForSchemaChangeProcess as our ZK session in MasterSchemachangeTracker will be - // lost when master dies and hence may not be accurate. So relying on old-fashioned - // sleep here. - Thread.sleep(25000); - Put put2 = new Put(row); - put2.add(Bytes.toBytes("family2"), qualifier, value); - ht.put(put2); - - Get get2 = new Get(row); - get2.addColumn(Bytes.toBytes("family2"), qualifier); - Result r2 = ht.get(get2); - byte[] tvalue2 = r2.getValue(Bytes.toBytes("family2"), qualifier); - int result2 = Bytes.compareTo(value, tvalue2); - assertEquals(result2, 0); - LOG.info("result2 = " + result2); - LOG.info("end testInstantSchemaChangeWhileMasterFailover()"); - ht.close(); - } - - /** - * TEst the master fail over during a schema change request in ZK. - * We create a fake schema change request in ZK and abort the primary master - * mid-flight to simulate a master fail over scenario during a mid-flight - * schema change process. The new master's schema janitor will eventually - * cleanup this fake request after time out. - * @throws IOException - * @throws KeeperException - * @throws InterruptedException - */ - @Ignore - @Test - public void testInstantSchemaOperationsInZKForMasterFailover() throws IOException, - KeeperException, InterruptedException { - LOG.info("testInstantSchemaOperationsInZKForMasterFailover() "); - String tableName = "testInstantSchemaOperationsInZKForMasterFailover"; - - conf = TEST_UTIL.getConfiguration(); - MasterSchemaChangeTracker activesct = - TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); - activesct.createSchemaChangeNode(tableName, 10); - LOG.debug(activesct.getSchemaChangeNodePathForTable(tableName) - + " created"); - assertTrue(activesct.doesSchemaChangeNodeExists(tableName)); - // Kill primary master now. - miniHBaseCluster.getMaster().abort("Aborting master now", new Exception("Schema exception")); - // wait for 50 secs. This is so that our schema janitor from fail-over master will kick-in and - // cleanup this failed/expired schema change request. - Thread.sleep(50000); - MasterSchemaChangeTracker newmsct = miniHBaseCluster.getMaster().getSchemaChangeTracker(); - assertFalse(newmsct.doesSchemaChangeNodeExists(tableName)); - LOG.debug(newmsct.getSchemaChangeNodePathForTable(tableName) - + " deleted"); - LOG.info("END testInstantSchemaOperationsInZKForMasterFailover() "); - } - - private HTable createTableAndValidate(String tableName) throws IOException { - conf = TEST_UTIL.getConfiguration(); - LOG.info("Start createTableAndValidate()"); - HTableDescriptor[] tables = admin.listTables(); - int numTables = 0; - if (tables != null) { - numTables = tables.length; - } - HTable ht = TEST_UTIL.createTable(Bytes.toBytes(tableName), - HConstants.CATALOG_FAMILY); - tables = this.admin.listTables(); - assertEquals(numTables + 1, tables.length); - LOG.info("created table = " + tableName); - return ht; - } - - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} - - - diff --git a/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeSplit.java b/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeSplit.java deleted file mode 100644 index 8f3124b8ea4..00000000000 --- a/src/test/java/org/apache/hadoop/hbase/client/TestInstantSchemaChangeSplit.java +++ /dev/null @@ -1,224 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hbase.client; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.HColumnDescriptor; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.LargeTests; -import org.apache.hadoop.hbase.io.hfile.Compression; -import org.apache.hadoop.hbase.regionserver.HRegion; -import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; -import org.apache.zookeeper.KeeperException; -import org.junit.Test; -import org.junit.experimental.categories.Category; - -@Category(LargeTests.class) -public class TestInstantSchemaChangeSplit extends InstantSchemaChangeTestBase { - - final Log LOG = LogFactory.getLog(getClass()); - - /** - * The objective of the following test is to validate that schema exclusions happen properly. - * When a RS server dies or crashes(?) mid-flight during a schema refresh, we would exclude - * all online regions in that RS, as well as the RS itself from schema change process. - * - * @throws IOException - * @throws KeeperException - * @throws InterruptedException - */ - @Test - public void testInstantSchemaChangeExclusions() throws IOException, - KeeperException, InterruptedException { - MasterSchemaChangeTracker msct = - TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); - LOG.info("Start testInstantSchemaChangeExclusions() "); - String tableName = "testInstantSchemaChangeExclusions"; - HTable ht = createTableAndValidate(tableName); - - HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY); - hcd.setMaxVersions(99); - hcd.setBlockCacheEnabled(false); - - HRegionServer hrs = findRSWithOnlineRegionFor(tableName); - //miniHBaseCluster.getRegionServer(0).abort("killed for test"); - admin.modifyColumn(Bytes.toBytes(tableName), hcd); - hrs.abort("Aborting for tests"); - hrs.getSchemaChangeTracker().setSleepTimeMillis(20000); - - //admin.modifyColumn(Bytes.toBytes(tableName), hcd); - LOG.debug("Waiting for Schema Change process to complete"); - waitForSchemaChangeProcess(tableName, 15000); - assertEquals(msct.doesSchemaChangeNodeExists(tableName), false); - // Sleep for some time so that our region is reassigned to some other RS - // by master. - Thread.sleep(10000); - List onlineRegions - = miniHBaseCluster.getRegions(Bytes.toBytes("testInstantSchemaChangeExclusions")); - assertTrue(!onlineRegions.isEmpty()); - for (HRegion onlineRegion : onlineRegions) { - HTableDescriptor htd = onlineRegion.getTableDesc(); - HColumnDescriptor tableHcd = htd.getFamily(HConstants.CATALOG_FAMILY); - assertTrue(tableHcd.isBlockCacheEnabled() == false); - assertEquals(tableHcd.getMaxVersions(), 99); - } - LOG.info("End testInstantSchemaChangeExclusions() "); - ht.close(); - } - - /** - * This test validates that when a schema change request fails on the - * RS side, we appropriately register the failure in the Master Schema change - * tracker's node as well as capture the error cause. - * - * Currently an alter request fails if RS fails with an IO exception say due to - * missing or incorrect codec. With instant schema change the same failure happens - * and we register the failure with associated cause and also update the - * monitor status appropriately. - * - * The region(s) will be orphaned in both the cases. - * - */ - @Test - public void testInstantSchemaChangeWhileRSOpenRegionFailure() throws IOException, - KeeperException, InterruptedException { - MasterSchemaChangeTracker msct = - TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); - - LOG.info("Start testInstantSchemaChangeWhileRSOpenRegionFailure() "); - String tableName = "testInstantSchemaChangeWhileRSOpenRegionFailure"; - HTable ht = createTableAndValidate(tableName); - - // create now 100 regions - TEST_UTIL.createMultiRegions(conf, ht, - HConstants.CATALOG_FAMILY, 10); - - // wait for all the regions to be assigned - Thread.sleep(10000); - List onlineRegions - = miniHBaseCluster.getRegions( - Bytes.toBytes("testInstantSchemaChangeWhileRSOpenRegionFailure")); - int size = onlineRegions.size(); - // we will not have any online regions - LOG.info("Size of online regions = " + onlineRegions.size()); - - HColumnDescriptor hcd = new HColumnDescriptor(HConstants.CATALOG_FAMILY); - hcd.setMaxVersions(99); - hcd.setBlockCacheEnabled(false); - hcd.setCompressionType(Compression.Algorithm.SNAPPY); - - admin.modifyColumn(Bytes.toBytes(tableName), hcd); - Thread.sleep(100); - - assertEquals(msct.doesSchemaChangeNodeExists(tableName), true); - Thread.sleep(10000); - // get the current alter status and validate that its failure with appropriate error msg. - MasterSchemaChangeTracker.MasterAlterStatus mas = msct.getMasterAlterStatus(tableName); - assertTrue(mas != null); - assertEquals(mas.getCurrentAlterStatus(), - MasterSchemaChangeTracker.MasterAlterStatus.AlterState.FAILURE); - assertTrue(mas.getErrorCause() != null); - LOG.info("End testInstantSchemaChangeWhileRSOpenRegionFailure() "); - ht.close(); - } - - @Test - public void testConcurrentInstantSchemaChangeAndSplit() throws IOException, - InterruptedException, KeeperException { - final String tableName = "testConcurrentInstantSchemaChangeAndSplit"; - conf = TEST_UTIL.getConfiguration(); - LOG.info("Start testConcurrentInstantSchemaChangeAndSplit()"); - final String newFamily = "newFamily"; - HTable ht = createTableAndValidate(tableName); - final MasterSchemaChangeTracker msct = - TEST_UTIL.getHBaseCluster().getMaster().getSchemaChangeTracker(); - - // create now 10 regions - TEST_UTIL.createMultiRegions(conf, ht, - HConstants.CATALOG_FAMILY, 4); - int rowCount = TEST_UTIL.loadTable(ht, HConstants.CATALOG_FAMILY); - //assertRowCount(t, rowCount); - - Runnable splitter = new Runnable() { - public void run() { - // run the splits now. - try { - LOG.info("Splitting table now "); - admin.split(Bytes.toBytes(tableName)); - } catch (IOException e) { - e.printStackTrace(); - } catch (InterruptedException e) { - e.printStackTrace(); - } - } - }; - - Runnable schemaChanger = new Runnable() { - public void run() { - HColumnDescriptor hcd = new HColumnDescriptor(newFamily); - try { - admin.addColumn(Bytes.toBytes(tableName), hcd); - } catch (IOException ioe) { - ioe.printStackTrace(); - - } - } - }; - schemaChanger.run(); - Thread.sleep(50); - splitter.run(); - waitForSchemaChangeProcess(tableName, 40000); - - Put put1 = new Put(row); - put1.add(Bytes.toBytes(newFamily), qualifier, value); - LOG.info("******** Put into new column family "); - ht.put(put1); - ht.flushCommits(); - - LOG.info("******** Get from new column family "); - Get get1 = new Get(row); - get1.addColumn(Bytes.toBytes(newFamily), qualifier); - Result r = ht.get(get1); - byte[] tvalue = r.getValue(Bytes.toBytes(newFamily), qualifier); - LOG.info(" Value put = " + value + " value from table = " + tvalue); - int result = Bytes.compareTo(value, tvalue); - assertEquals(result, 0); - LOG.info("End testConcurrentInstantSchemaChangeAndSplit() "); - ht.close(); - } - - - - @org.junit.Rule - public org.apache.hadoop.hbase.ResourceCheckerJUnitRule cu = - new org.apache.hadoop.hbase.ResourceCheckerJUnitRule(); -} - - - diff --git a/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java b/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java index d2b30605f6c..41616c8bf49 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java +++ b/src/test/java/org/apache/hadoop/hbase/master/MockRegionServer.java @@ -360,6 +360,12 @@ class MockRegionServer implements HRegionInterface, RegionServerServices { return null; } + @Override + public List getOnlineRegions(byte[] tableName) throws IOException { + // TODO Auto-generated method stub + return null; + } + @Override public HServerInfo getHServerInfo() throws IOException { // TODO Auto-generated method stub @@ -513,17 +519,6 @@ class MockRegionServer implements HRegionInterface, RegionServerServices { return null; } - @Override - public List getOnlineRegions(byte[] tableName) throws IOException { - // TODO Auto-generated method stub - return null; - } - - @Override - public void refreshRegion(HRegion hRegion) throws IOException { - // TODO Auto-generated method stub - } - @Override public Configuration getConfiguration() { return this.conf; diff --git a/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java b/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java index 227c5f28446..b4dcb83b900 100644 --- a/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java +++ b/src/test/java/org/apache/hadoop/hbase/master/TestCatalogJanitor.java @@ -51,15 +51,12 @@ import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HConnectionManager; import org.apache.hadoop.hbase.client.HConnectionTestingUtility; import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.executor.EventHandler; import org.apache.hadoop.hbase.executor.ExecutorService; import org.apache.hadoop.hbase.io.Reference; import org.apache.hadoop.hbase.ipc.HRegionInterface; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Writables; -import org.apache.hadoop.hbase.zookeeper.MasterSchemaChangeTracker; -import org.apache.hadoop.hbase.zookeeper.RegionServerTracker; import org.apache.hadoop.hbase.zookeeper.ZooKeeperWatcher; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -155,6 +152,11 @@ public class TestCatalogJanitor { this.asm = Mockito.mock(AssignmentManager.class); } + @Override + public void checkTableModifiable(byte[] tableName) throws IOException { + //no-op + } + @Override public void createTable(HTableDescriptor desc, byte[][] splitKeys) throws IOException { @@ -171,11 +173,6 @@ public class TestCatalogJanitor { return null; } - public void checkTableModifiable(byte[] tableName, - EventHandler.EventType eventType) - throws IOException { - } - @Override public MasterFileSystem getMasterFileSystem() { return this.mfs; @@ -263,14 +260,6 @@ public class TestCatalogJanitor { }; } - public MasterSchemaChangeTracker getSchemaChangeTracker() { - return null; - } - - public RegionServerTracker getRegionServerTracker() { - return null; - } - @Override public boolean isServerShutdownHandlerEnabled() { return true; diff --git a/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java b/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java index bb3ddd728a7..7d0275960cf 100644 --- a/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java +++ b/src/test/java/org/apache/hadoop/hbase/util/MockRegionServerServices.java @@ -63,9 +63,6 @@ public class MockRegionServerServices implements RegionServerServices { return null; } - public void refreshRegion(HRegion hRegion) throws IOException { - } - @Override public void addToOnlineRegions(HRegion r) { this.regions.put(r.getRegionInfo().getEncodedName(), r);