HBASE-1730 Online schema changes for HBase
git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1164797 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d0e61e30f1
commit
236709bcb1
|
@ -438,6 +438,7 @@ Release 0.91.0 - Unreleased
|
|||
(Lars Hofhansl)
|
||||
HBASE-4257 Limit the number of regions in transitions displayed on
|
||||
master webpage. (todd)
|
||||
HBASE-1730 Online Schema Changes
|
||||
|
||||
TASKS
|
||||
HBASE-3559 Move report of split to master OFF the heartbeat channel
|
||||
|
|
|
@ -846,6 +846,28 @@ public class HBaseAdmin implements Abortable, Closeable {
|
|||
return connection.isTableAvailable(Bytes.toBytes(tableName));
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the status of alter command - indicates how many regions have received
|
||||
* the updated schema Asynchronous operation.
|
||||
*
|
||||
* @param tableName
|
||||
* name of the table to get the status of
|
||||
* @return Pair indicating the number of regions updated Pair.getFirst() is the
|
||||
* regions that are yet to be updated Pair.getSecond() is the total number
|
||||
* of regions of the table
|
||||
* @throws IOException
|
||||
* if a remote or network exception occurs
|
||||
*/
|
||||
public Pair<Integer, Integer> getAlterStatus(final byte[] tableName)
|
||||
throws IOException {
|
||||
HTableDescriptor.isLegalTableName(tableName);
|
||||
try {
|
||||
return getMaster().getAlterStatus(tableName);
|
||||
} catch (RemoteException e) {
|
||||
throw RemoteExceptionHandler.decodeRemoteException(e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a column to an existing table.
|
||||
* Asynchronous operation.
|
||||
|
|
|
@ -140,6 +140,14 @@ public abstract class EventHandler implements Runnable, Comparable<Runnable> {
|
|||
* Constructor
|
||||
*/
|
||||
EventType(int value) {}
|
||||
public boolean isOnlineSchemaChangeSupported() {
|
||||
return (
|
||||
this.equals(EventType.C_M_ADD_FAMILY) ||
|
||||
this.equals(EventType.C_M_DELETE_FAMILY) ||
|
||||
this.equals(EventType.C_M_MODIFY_FAMILY) ||
|
||||
this.equals(EventType.C_M_MODIFY_TABLE)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -26,6 +26,7 @@ import org.apache.hadoop.hbase.ClusterStatus;
|
|||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.UnknownRegionException;
|
||||
import org.apache.hadoop.hbase.util.Pair;
|
||||
import org.apache.hadoop.hbase.ipc.VersionedProtocol;
|
||||
|
||||
/**
|
||||
|
@ -71,6 +72,19 @@ public interface HMasterInterface extends VersionedProtocol {
|
|||
*/
|
||||
public void deleteTable(final byte [] tableName) throws IOException;
|
||||
|
||||
/**
|
||||
* Used by the client to get the number of regions that have received the
|
||||
* updated schema
|
||||
*
|
||||
* @param tableName
|
||||
* @return Pair indicating the number of regions updated Pair.getFirst() is the
|
||||
* regions that are yet to be updated Pair.getSecond() is the total number
|
||||
* of regions of the table
|
||||
* @throws IOException
|
||||
*/
|
||||
public Pair<Integer, Integer> getAlterStatus(byte[] tableName)
|
||||
throws IOException;
|
||||
|
||||
/**
|
||||
* Adds a column to the specified table
|
||||
* @param tableName table to modify
|
||||
|
@ -222,4 +236,4 @@ public interface HMasterInterface extends VersionedProtocol {
|
|||
*/
|
||||
public HTableDescriptor[] getHTableDescriptors(List<String> tableNames);
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
@ -25,6 +25,7 @@ import java.io.IOException;
|
|||
import java.lang.Thread.UncaughtExceptionHandler;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Date;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
|
@ -104,6 +105,12 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
|
||||
private LoadBalancer balancer;
|
||||
|
||||
/**
|
||||
* Map of regions to reopen after the schema of a table is changed. Key -
|
||||
* encoded region name, value - HRegionInfo
|
||||
*/
|
||||
private final Map <String, HRegionInfo> regionsToReopen;
|
||||
|
||||
/*
|
||||
* Maximum times we recurse an assignment. See below in {@link #assign()}.
|
||||
*/
|
||||
|
@ -165,6 +172,8 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
this.serverManager = serverManager;
|
||||
this.catalogTracker = catalogTracker;
|
||||
this.executorService = service;
|
||||
this.regionsToReopen = Collections.synchronizedMap
|
||||
(new HashMap<String, HRegionInfo> ());
|
||||
Configuration conf = master.getConfiguration();
|
||||
this.timeoutMonitor = new TimeoutMonitor(
|
||||
conf.getInt("hbase.master.assignment.timeoutmonitor.period", 10000),
|
||||
|
@ -206,7 +215,60 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// sharing.
|
||||
return this.zkTable;
|
||||
}
|
||||
/**
|
||||
* Returns the RegionServer to which hri is assigned.
|
||||
*
|
||||
* @param hri
|
||||
* HRegion for which this function returns the region server
|
||||
* @return HServerInfo The region server to which hri belongs
|
||||
*/
|
||||
public ServerName getRegionServerOfRegion(HRegionInfo hri) {
|
||||
synchronized (this.regions ) {
|
||||
return regions.get(hri);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a regionPlan for the specified region.
|
||||
*/
|
||||
public void addPlan(String encodedName, RegionPlan plan) {
|
||||
synchronized (regionPlans) {
|
||||
regionPlans.put(encodedName, plan);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the list of regions that will be reopened
|
||||
* because of an update in table schema
|
||||
*
|
||||
* @param regions
|
||||
* list of regions that should be tracked for reopen
|
||||
*/
|
||||
public void setRegionsToReopen(List <HRegionInfo> regions) {
|
||||
for(HRegionInfo hri : regions) {
|
||||
regionsToReopen.put(hri.getEncodedName(), hri);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Used by the client to identify if all regions have the schema updates
|
||||
*
|
||||
* @param tableName
|
||||
* @return Pair indicating the status of the alter command
|
||||
* @throws IOException
|
||||
*/
|
||||
public Pair<Integer, Integer> getReopenStatus(byte[] tableName)
|
||||
throws IOException {
|
||||
List <HRegionInfo> hris = MetaReader.getTableRegions(
|
||||
this.master.getCatalogTracker(), tableName);
|
||||
Integer pending = 0;
|
||||
for(HRegionInfo hri : hris) {
|
||||
if(regionsToReopen.get(hri.getEncodedName()) != null) {
|
||||
pending++;
|
||||
}
|
||||
}
|
||||
return new Pair<Integer, Integer>(pending, hris.size());
|
||||
}
|
||||
/**
|
||||
* Reset all unassigned znodes. Called on startup of master.
|
||||
* Call {@link #assignAllUserRegions()} after root and meta have been assigned.
|
||||
|
@ -466,6 +528,18 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
new ClosedRegionHandler(this.master, this, hri).process();
|
||||
}
|
||||
|
||||
/**
|
||||
* When a region is closed, it should be removed from the regionsToReopen
|
||||
* @param hri HRegionInfo of the region which was closed
|
||||
*/
|
||||
public void removeClosedRegion(HRegionInfo hri) {
|
||||
if (!regionsToReopen.isEmpty()) {
|
||||
if (regionsToReopen.remove(hri.getEncodedName()) != null) {
|
||||
LOG.debug("Removed region from reopening regions because it was closed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param regionInfo
|
||||
* @param deadServers Map of deadServers and the regions they were carrying;
|
||||
|
@ -601,6 +675,7 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
// what follows will fail because not in expected state.
|
||||
regionState.update(RegionState.State.CLOSED,
|
||||
data.getStamp(), data.getOrigin());
|
||||
removeClosedRegion(regionState.getRegion());
|
||||
this.executorService.submit(new ClosedRegionHandler(master,
|
||||
this, regionState.getRegion()));
|
||||
break;
|
||||
|
@ -1390,6 +1465,11 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
RegionPlan existingPlan = null;
|
||||
synchronized (this.regionPlans) {
|
||||
existingPlan = this.regionPlans.get(encodedName);
|
||||
if (existingPlan != null && existingPlan.getDestination() != null) {
|
||||
LOG.debug("Found an existing plan for " +
|
||||
state.getRegion().getRegionNameAsString() +
|
||||
" destination server is + " + existingPlan.getDestination().toString());
|
||||
}
|
||||
if (forceNewPlan || existingPlan == null
|
||||
|| existingPlan.getDestination() == null
|
||||
|| existingPlan.getDestination().equals(serverToExclude)) {
|
||||
|
@ -1411,6 +1491,37 @@ public class AssignmentManager extends ZooKeeperListener {
|
|||
return existingPlan;
|
||||
}
|
||||
|
||||
/**
|
||||
* Unassign the list of regions. Configuration knobs:
|
||||
* hbase.bulk.waitbetween.reopen indicates the number of milliseconds to
|
||||
* wait before unassigning another region from this region server
|
||||
*
|
||||
* @param regions
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
public void unassign(List<HRegionInfo> regions) {
|
||||
int waitTime = this.master.getConfiguration().getInt(
|
||||
"hbase.bulk.waitbetween.reopen", 0);
|
||||
for (HRegionInfo region : regions) {
|
||||
if (isRegionInTransition(region) != null)
|
||||
continue;
|
||||
unassign(region, false);
|
||||
while (isRegionInTransition(region) != null) {
|
||||
try {
|
||||
Thread.sleep(10);
|
||||
} catch (InterruptedException e) {
|
||||
// Do nothing, continue
|
||||
}
|
||||
}
|
||||
if (waitTime > 0)
|
||||
try {
|
||||
Thread.sleep(waitTime);
|
||||
} catch (InterruptedException e) {
|
||||
// Do nothing, continue
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Unassigns the specified region.
|
||||
* <p>
|
||||
|
|
|
@ -0,0 +1,99 @@
|
|||
/**
|
||||
* Copyright 2010 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.master;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.BulkAssigner;
|
||||
import org.apache.hadoop.hbase.master.RegionPlan;
|
||||
import org.apache.commons.logging.Log;
|
||||
|
||||
/**
|
||||
* Performs bulk reopen of the list of regions provided to it.
|
||||
*/
|
||||
public class BulkReOpen extends BulkAssigner {
|
||||
private final Map<ServerName, List<HRegionInfo>> rsToRegions;
|
||||
private final AssignmentManager assignmentManager;
|
||||
private static final Log LOG = LogFactory.getLog(BulkReOpen.class);
|
||||
|
||||
public BulkReOpen(final Server server,
|
||||
final Map<ServerName, List<HRegionInfo>> serverToRegions,
|
||||
final AssignmentManager am) {
|
||||
super(server);
|
||||
this.assignmentManager = am;
|
||||
this.rsToRegions = serverToRegions;
|
||||
}
|
||||
|
||||
/**
|
||||
* Unassign all regions, so that they go through the regular region
|
||||
* assignment flow (in assignment manager) and are re-opened.
|
||||
*/
|
||||
@Override
|
||||
protected void populatePool(ExecutorService pool) {
|
||||
LOG.debug("Creating threads for each region server ");
|
||||
for (Map.Entry<ServerName, List<HRegionInfo>> e : rsToRegions
|
||||
.entrySet()) {
|
||||
final List<HRegionInfo> hris = e.getValue();
|
||||
// add a plan for each of the regions that needs to be reopened
|
||||
for (HRegionInfo hri : hris) {
|
||||
RegionPlan reOpenPlan = new RegionPlan(hri, null,
|
||||
assignmentManager.getRegionServerOfRegion(hri));
|
||||
assignmentManager.addPlan(hri.getEncodedName(), reOpenPlan);
|
||||
}
|
||||
pool.execute(new Runnable() {
|
||||
public void run() {
|
||||
assignmentManager.unassign(hris);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Reopen the regions asynchronously, so always returns true immediately.
|
||||
* @return true
|
||||
*/
|
||||
@Override
|
||||
protected boolean waitUntilDone(long timeout) {
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Configuration knobs "hbase.bulk.reopen.threadpool.size" number of regions
|
||||
* that can be reopened concurrently. The maximum number of threads the master
|
||||
* creates is never more than the number of region servers.
|
||||
* If configuration is not defined it defaults to 20
|
||||
*/
|
||||
protected int getThreadCount() {
|
||||
int defaultThreadCount = super.getThreadCount();
|
||||
return this.server.getConfiguration().getInt(
|
||||
"hbase.bulk.reopen.threadpool.size", defaultThreadCount);
|
||||
}
|
||||
|
||||
public boolean bulkReOpen() throws InterruptedException {
|
||||
return bulkAssign();
|
||||
}
|
||||
}
|
|
@ -991,6 +991,18 @@ implements HMasterInterface, HMasterRegionInterface, MasterServices, Server {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of regions of the table that have been updated by the alter.
|
||||
*
|
||||
* @return Pair indicating the number of regions updated Pair.getFirst is the
|
||||
* regions that are yet to be updated Pair.getSecond is the total number
|
||||
* of regions of the table
|
||||
*/
|
||||
public Pair<Integer, Integer> getAlterStatus(byte[] tableName)
|
||||
throws IOException {
|
||||
return this.assignmentManager.getReopenStatus(tableName);
|
||||
}
|
||||
|
||||
public void addColumn(byte [] tableName, HColumnDescriptor column)
|
||||
throws IOException {
|
||||
if (cpHost != null) {
|
||||
|
|
|
@ -98,6 +98,7 @@ public class ClosedRegionHandler extends EventHandler implements TotesHRegionInf
|
|||
}
|
||||
// ZK Node is in CLOSED state, assign it.
|
||||
assignmentManager.setOffline(regionInfo);
|
||||
assignmentManager.removeClosedRegion(regionInfo);
|
||||
assignmentManager.assign(regionInfo, true);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -27,7 +27,6 @@ import org.apache.hadoop.hbase.HRegionInfo;
|
|||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.InvalidFamilyOperationException;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.master.AssignmentManager;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
|
@ -47,8 +46,8 @@ public class TableAddFamilyHandler extends TableEventHandler {
|
|||
@Override
|
||||
protected void handleTableOperation(List<HRegionInfo> hris)
|
||||
throws IOException {
|
||||
AssignmentManager am = this.masterServices.getAssignmentManager();
|
||||
HTableDescriptor htd = this.masterServices.getTableDescriptors().get(Bytes.toString(tableName));
|
||||
HTableDescriptor htd = this.masterServices.getTableDescriptors().
|
||||
get(Bytes.toString(tableName));
|
||||
byte [] familyName = familyDesc.getName();
|
||||
if (htd == null) {
|
||||
throw new IOException("Add Family operation could not be completed as " +
|
||||
|
@ -76,7 +75,8 @@ public class TableAddFamilyHandler extends TableEventHandler {
|
|||
if(familyDesc != null) {
|
||||
family = familyDesc.getNameAsString();
|
||||
}
|
||||
return getClass().getSimpleName() + "-" + name + "-" + getSeqid() + "-" + tableNameStr + "-" + family;
|
||||
return getClass().getSimpleName() + "-" + name + "-" +
|
||||
getSeqid() + "-" + tableNameStr + "-" + family;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -20,18 +20,28 @@
|
|||
package org.apache.hadoop.hbase.master.handler;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.NavigableMap;
|
||||
import java.util.TreeMap;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.Server;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||
import org.apache.hadoop.hbase.catalog.MetaReader;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.master.BulkReOpen;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
|
||||
/**
|
||||
* Base class for performing operations against tables.
|
||||
* Checks on whether the process can go forward are done in constructor rather
|
||||
|
@ -51,7 +61,16 @@ public abstract class TableEventHandler extends EventHandler {
|
|||
super(server, eventType);
|
||||
this.masterServices = masterServices;
|
||||
this.tableName = tableName;
|
||||
this.masterServices.checkTableModifiable(tableName);
|
||||
try {
|
||||
this.masterServices.checkTableModifiable(tableName);
|
||||
} catch (TableNotDisabledException ex) {
|
||||
if (eventType.isOnlineSchemaChangeSupported()) {
|
||||
LOG.debug("Ignoring table not disabled exception " +
|
||||
"for supporting online schema changes.");
|
||||
} else {
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
this.tableNameStr = Bytes.toString(this.tableName);
|
||||
}
|
||||
|
||||
|
@ -64,6 +83,17 @@ public abstract class TableEventHandler extends EventHandler {
|
|||
MetaReader.getTableRegions(this.server.getCatalogTracker(),
|
||||
tableName);
|
||||
handleTableOperation(hris);
|
||||
if (eventType.isOnlineSchemaChangeSupported() && this.masterServices.
|
||||
getAssignmentManager().getZKTable().
|
||||
isEnabledTable(Bytes.toString(tableName))) {
|
||||
this.masterServices.getAssignmentManager().setRegionsToReopen(hris);
|
||||
if (reOpenAllRegions(hris)) {
|
||||
LOG.info("Completed table operation " + eventType + " on table " +
|
||||
Bytes.toString(tableName));
|
||||
} else {
|
||||
LOG.warn("Error on reopening the regions");
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.error("Error manipulating table " + Bytes.toString(tableName), e);
|
||||
} catch (KeeperException e) {
|
||||
|
@ -71,6 +101,43 @@ public abstract class TableEventHandler extends EventHandler {
|
|||
}
|
||||
}
|
||||
|
||||
public boolean reOpenAllRegions(List<HRegionInfo> regions) throws IOException {
|
||||
boolean done = false;
|
||||
LOG.info("Bucketing regions by region server...");
|
||||
HTable table = new HTable(masterServices.getConfiguration(), tableName);
|
||||
TreeMap<ServerName, List<HRegionInfo>> serverToRegions = Maps
|
||||
.newTreeMap();
|
||||
NavigableMap<HRegionInfo, ServerName> hriHserverMapping = table.getRegionLocations();
|
||||
|
||||
for (HRegionInfo hri : regions) {
|
||||
ServerName rsLocation = hriHserverMapping.get(hri);
|
||||
if (!serverToRegions.containsKey(rsLocation)) {
|
||||
LinkedList<HRegionInfo> hriList = Lists.newLinkedList();
|
||||
serverToRegions.put(rsLocation, hriList);
|
||||
}
|
||||
serverToRegions.get(rsLocation).add(hri);
|
||||
}
|
||||
LOG.info("Reopening " + regions.size() + " regions on "
|
||||
+ serverToRegions.size() + " region servers.");
|
||||
BulkReOpen bulkReopen = new BulkReOpen(this.server, serverToRegions,
|
||||
this.masterServices.getAssignmentManager());
|
||||
while (true) {
|
||||
try {
|
||||
if (bulkReopen.bulkReOpen()) {
|
||||
done = true;
|
||||
break;
|
||||
} else {
|
||||
LOG.warn("Timeout before reopening all regions");
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
LOG.warn("Reopen was interrupted");
|
||||
// Preserve the interrupt.
|
||||
Thread.currentThread().interrupt();
|
||||
break;
|
||||
}
|
||||
}
|
||||
return done;
|
||||
}
|
||||
protected abstract void handleTableOperation(List<HRegionInfo> regions)
|
||||
throws IOException, KeeperException;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#
|
||||
|
||||
include Java
|
||||
java_import org.apache.hadoop.hbase.util.Pair
|
||||
|
||||
# Wrapper for org.apache.hadoop.hbase.client.HBaseAdmin
|
||||
|
||||
|
@ -270,16 +271,31 @@ module Hbase
|
|||
end
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
# Change table structure or table options
|
||||
def alter(table_name, *args)
|
||||
# Check the status of alter command (number of regions reopened)
|
||||
def alter_status(table_name)
|
||||
# Table name should be a string
|
||||
raise(ArgumentError, "Table name must be of type String") unless table_name.kind_of?(String)
|
||||
|
||||
# Table should exist
|
||||
raise(ArgumentError, "Can't find a table: #{table_name}") unless exists?(table_name)
|
||||
|
||||
# Table should be disabled
|
||||
raise(ArgumentError, "Table #{table_name} is enabled. Disable it first before altering.") if enabled?(table_name)
|
||||
status = Pair.new()
|
||||
begin
|
||||
status = @admin.getAlterStatus(table_name.to_java_bytes)
|
||||
puts "#{status.getSecond() - status.getFirst()}/#{status.getSecond()} regions updated."
|
||||
sleep 1
|
||||
end while status != nil && status.getFirst() != 0
|
||||
puts "Done."
|
||||
end
|
||||
|
||||
#----------------------------------------------------------------------------------------------
|
||||
# Change table structure or table options
|
||||
def alter(table_name, wait = true, *args)
|
||||
# Table name should be a string
|
||||
raise(ArgumentError, "Table name must be of type String") unless table_name.kind_of?(String)
|
||||
|
||||
# Table should exist
|
||||
raise(ArgumentError, "Can't find a table: #{table_name}") unless exists?(table_name)
|
||||
|
||||
# There should be at least one argument
|
||||
raise(ArgumentError, "There should be at least one argument but the table name") if args.empty?
|
||||
|
@ -306,8 +322,16 @@ module Hbase
|
|||
# If column already exist, then try to alter it. Create otherwise.
|
||||
if htd.hasFamily(column_name.to_java_bytes)
|
||||
@admin.modifyColumn(table_name, column_name, descriptor)
|
||||
if wait == true
|
||||
puts "Updating all regions with the new schema..."
|
||||
alter_status(table_name)
|
||||
end
|
||||
else
|
||||
@admin.addColumn(table_name, descriptor)
|
||||
if wait == true
|
||||
puts "Updating all regions with the new schema..."
|
||||
alter_status(table_name)
|
||||
end
|
||||
end
|
||||
next
|
||||
end
|
||||
|
@ -316,6 +340,10 @@ module Hbase
|
|||
if method == "delete"
|
||||
raise(ArgumentError, "NAME parameter missing for delete method") unless arg[NAME]
|
||||
@admin.deleteColumn(table_name, arg[NAME])
|
||||
if wait == true
|
||||
puts "Updating all regions with the new schema..."
|
||||
alter_status(table_name)
|
||||
end
|
||||
next
|
||||
end
|
||||
|
||||
|
@ -326,6 +354,10 @@ module Hbase
|
|||
htd.setMemStoreFlushSize(JLong.valueOf(arg[MEMSTORE_FLUSHSIZE])) if arg[MEMSTORE_FLUSHSIZE]
|
||||
htd.setDeferredLogFlush(JBoolean.valueOf(arg[DEFERRED_LOG_FLUSH])) if arg[DEFERRED_LOG_FLUSH]
|
||||
@admin.modifyTable(table_name.to_java_bytes, htd)
|
||||
if wait == true
|
||||
puts "Updating all regions with the new schema..."
|
||||
alter_status(table_name)
|
||||
end
|
||||
next
|
||||
end
|
||||
|
||||
|
|
|
@ -228,6 +228,8 @@ Shell.load_command_group(
|
|||
exists
|
||||
list
|
||||
show_filters
|
||||
alter_status
|
||||
alter_async
|
||||
]
|
||||
)
|
||||
|
||||
|
|
|
@ -56,7 +56,7 @@ EOF
|
|||
|
||||
def command(table, *args)
|
||||
format_simple_command do
|
||||
admin.alter(table, *args)
|
||||
admin.alter(table, true, *args)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
@ -0,0 +1,66 @@
|
|||
#
|
||||
# Copyright 2010 The Apache Software Foundation
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class AlterAsync < Command
|
||||
def help
|
||||
return <<-EOF
|
||||
Alter column family schema, does not wait for all regions to receive the
|
||||
schema changes. Pass table name and a dictionary specifying new column
|
||||
family schema. Dictionaries are described on the main help command output.
|
||||
Dictionary must include name of column family to alter. For example,
|
||||
|
||||
To change or add the 'f1' column family in table 't1' from defaults
|
||||
to instead keep a maximum of 5 cell VERSIONS, do:
|
||||
|
||||
hbase> alter_async 't1', NAME => 'f1', VERSIONS => 5
|
||||
|
||||
To delete the 'f1' column family in table 't1', do:
|
||||
|
||||
hbase> alter_async 't1', NAME => 'f1', METHOD => 'delete'
|
||||
|
||||
or a shorter version:
|
||||
|
||||
hbase> alter_async 't1', 'delete' => 'f1'
|
||||
|
||||
You can also change table-scope attributes like MAX_FILESIZE
|
||||
MEMSTORE_FLUSHSIZE, READONLY, and DEFERRED_LOG_FLUSH.
|
||||
|
||||
For example, to change the max size of a family to 128MB, do:
|
||||
|
||||
hbase> alter 't1', METHOD => 'table_att', MAX_FILESIZE => '134217728'
|
||||
|
||||
There could be more than one alteration in one command:
|
||||
|
||||
hbase> alter 't1', {NAME => 'f1'}, {NAME => 'f2', METHOD => 'delete'}
|
||||
|
||||
To check if all the regions have been updated, use alter_status <table_name>
|
||||
EOF
|
||||
end
|
||||
|
||||
def command(table, *args)
|
||||
format_simple_command do
|
||||
admin.alter(table, false, *args)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -0,0 +1,38 @@
|
|||
#
|
||||
# Copyright 2010 The Apache Software Foundation
|
||||
#
|
||||
# Licensed to the Apache Software Foundation (ASF) under one
|
||||
# or more contributor license agreements. See the NOTICE file
|
||||
# distributed with this work for additional information
|
||||
# regarding copyright ownership. The ASF licenses this file
|
||||
# to you under the Apache License, Version 2.0 (the
|
||||
# "License"); you may not use this file except in compliance
|
||||
# with the License. You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
|
||||
module Shell
|
||||
module Commands
|
||||
class AlterStatus < Command
|
||||
def help
|
||||
return <<-EOF
|
||||
Get the status of the alter command. Indicates the number of regions of the
|
||||
table that have received the updated schema
|
||||
Pass table name.
|
||||
|
||||
hbase> alter_status 't1'
|
||||
EOF
|
||||
end
|
||||
def command(table)
|
||||
admin.alter_status(table)
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
|
@ -42,8 +42,10 @@ import org.apache.hadoop.hbase.HColumnDescriptor;
|
|||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HServerInfo;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.NotServingRegionException;
|
||||
import org.apache.hadoop.hbase.ServerName;
|
||||
import org.apache.hadoop.hbase.TableExistsException;
|
||||
import org.apache.hadoop.hbase.TableNotDisabledException;
|
||||
import org.apache.hadoop.hbase.TableNotEnabledException;
|
||||
|
@ -51,6 +53,7 @@ import org.apache.hadoop.hbase.TableNotFoundException;
|
|||
import org.apache.hadoop.hbase.executor.EventHandler;
|
||||
import org.apache.hadoop.hbase.executor.EventHandler.EventType;
|
||||
import org.apache.hadoop.hbase.executor.ExecutorService;
|
||||
import org.apache.hadoop.hbase.ipc.HRegionInterface;
|
||||
import org.apache.hadoop.hbase.master.MasterServices;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -217,11 +220,13 @@ public class TestAdmin {
|
|||
/**
|
||||
* Verify schema modification takes.
|
||||
* @throws IOException
|
||||
* @throws InterruptedException
|
||||
*/
|
||||
@Test
|
||||
public void testChangeTableSchema() throws IOException {
|
||||
final byte [] tableName = Bytes.toBytes("changeTableSchema");
|
||||
@Test
|
||||
public void testOnlineChangeTableSchema() throws IOException, InterruptedException {
|
||||
final byte [] tableName = Bytes.toBytes("changeTableSchemaOnline");
|
||||
HTableDescriptor [] tables = admin.listTables();
|
||||
MasterServices masterServices = TEST_UTIL.getMiniHBaseCluster().getMaster();
|
||||
int numTables = tables.length;
|
||||
TEST_UTIL.createTable(tableName, HConstants.CATALOG_FAMILY);
|
||||
tables = this.admin.listTables();
|
||||
|
@ -240,14 +245,11 @@ public class TestAdmin {
|
|||
copy.setValue(key, key);
|
||||
boolean expectedException = false;
|
||||
try {
|
||||
this.admin.modifyTable(tableName, copy);
|
||||
modifyTable(tableName, copy);
|
||||
} catch (TableNotDisabledException re) {
|
||||
expectedException = true;
|
||||
}
|
||||
assertTrue(expectedException);
|
||||
this.admin.disableTable(tableName);
|
||||
assertTrue(this.admin.isTableDisabled(tableName));
|
||||
modifyTable(tableName, copy);
|
||||
assertFalse(expectedException);
|
||||
HTableDescriptor modifiedHtd = this.admin.getTableDescriptor(tableName);
|
||||
// Assert returned modifiedhcd is same as the copy.
|
||||
assertFalse(htd.equals(modifiedHtd));
|
||||
|
@ -255,11 +257,8 @@ public class TestAdmin {
|
|||
assertEquals(newFlushSize, modifiedHtd.getMemStoreFlushSize());
|
||||
assertEquals(key, modifiedHtd.getValue(key));
|
||||
|
||||
// Reenable table to test it fails if not disabled.
|
||||
this.admin.enableTable(tableName);
|
||||
assertFalse(this.admin.isTableDisabled(tableName));
|
||||
|
||||
// Now work on column family changes.
|
||||
htd = this.admin.getTableDescriptor(tableName);
|
||||
int countOfFamilies = modifiedHtd.getFamilies().size();
|
||||
assertTrue(countOfFamilies > 0);
|
||||
HColumnDescriptor hcd = modifiedHtd.getFamilies().iterator().next();
|
||||
|
@ -273,31 +272,25 @@ public class TestAdmin {
|
|||
} catch (TableNotDisabledException re) {
|
||||
expectedException = true;
|
||||
}
|
||||
assertTrue(expectedException);
|
||||
this.admin.disableTable(tableName);
|
||||
assertTrue(this.admin.isTableDisabled(tableName));
|
||||
// Modify Column is synchronous
|
||||
this.admin.modifyColumn(tableName, hcd);
|
||||
assertFalse(expectedException);
|
||||
modifiedHtd = this.admin.getTableDescriptor(tableName);
|
||||
HColumnDescriptor modifiedHcd = modifiedHtd.getFamily(hcdName);
|
||||
assertEquals(newMaxVersions, modifiedHcd.getMaxVersions());
|
||||
|
||||
// Try adding a column
|
||||
// Reenable table to test it fails if not disabled.
|
||||
this.admin.enableTable(tableName);
|
||||
assertFalse(this.admin.isTableDisabled(tableName));
|
||||
final String xtracolName = "xtracol";
|
||||
htd = this.admin.getTableDescriptor(tableName);
|
||||
HColumnDescriptor xtracol = new HColumnDescriptor(xtracolName);
|
||||
xtracol.setValue(xtracolName, xtracolName);
|
||||
expectedException = false;
|
||||
try {
|
||||
this.admin.addColumn(tableName, xtracol);
|
||||
} catch (TableNotDisabledException re) {
|
||||
expectedException = true;
|
||||
}
|
||||
assertTrue(expectedException);
|
||||
this.admin.disableTable(tableName);
|
||||
assertTrue(this.admin.isTableDisabled(tableName));
|
||||
this.admin.addColumn(tableName, xtracol);
|
||||
// Add column should work even if the table is enabled
|
||||
assertFalse(expectedException);
|
||||
modifiedHtd = this.admin.getTableDescriptor(tableName);
|
||||
hcd = modifiedHtd.getFamily(xtracol.getName());
|
||||
assertTrue(hcd != null);
|
||||
|
@ -310,6 +303,7 @@ public class TestAdmin {
|
|||
assertTrue(hcd == null);
|
||||
|
||||
// Delete the table
|
||||
this.admin.disableTable(tableName);
|
||||
this.admin.deleteTable(tableName);
|
||||
this.admin.listTables();
|
||||
assertFalse(this.admin.tableExists(tableName));
|
||||
|
|
Loading…
Reference in New Issue