HBASE-1420 add abliity to add and remove (table) indexes on existing tables
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@776395 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3e180d7ad0
commit
16373cb711
|
@ -266,6 +266,8 @@ Release 0.20.0 - Unreleased
|
||||||
HBASE-1432 LuceneDocumentWrapper is not public
|
HBASE-1432 LuceneDocumentWrapper is not public
|
||||||
HBASE-1401 close HLog (and open new one) if there hasnt been edits in N
|
HBASE-1401 close HLog (and open new one) if there hasnt been edits in N
|
||||||
minutes/hours
|
minutes/hours
|
||||||
|
HBASE-1420 add abliity to add and remove (table) indexes on existing
|
||||||
|
tables (Clint Morgan via Stack)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
HBASE-1412 Change values for delete column and column family in KeyValue
|
HBASE-1412 Change values for delete column and column family in KeyValue
|
||||||
|
|
|
@ -448,6 +448,10 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor>, I
|
||||||
indexes.put(index.getIndexId(), index);
|
indexes.put(index.getIndexId(), index);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void removeIndex(String indexId) {
|
||||||
|
indexes.remove(indexId);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adds a column family.
|
* Adds a column family.
|
||||||
* @param family HColumnDescriptor of familyto add.
|
* @param family HColumnDescriptor of familyto add.
|
||||||
|
|
|
@ -21,8 +21,13 @@ package org.apache.hadoop.hbase.client.tableindexed;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.SortedMap;
|
||||||
|
import java.util.TreeMap;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
import java.util.Map.Entry;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
import org.apache.hadoop.hbase.ColumnNameParseException;
|
import org.apache.hadoop.hbase.ColumnNameParseException;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||||
|
@ -31,6 +36,11 @@ import org.apache.hadoop.hbase.HTableDescriptor;
|
||||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||||
import org.apache.hadoop.hbase.TableExistsException;
|
import org.apache.hadoop.hbase.TableExistsException;
|
||||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||||
|
import org.apache.hadoop.hbase.client.HTable;
|
||||||
|
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||||
|
import org.apache.hadoop.hbase.io.Cell;
|
||||||
|
import org.apache.hadoop.hbase.io.RowResult;
|
||||||
|
import org.apache.hadoop.hbase.regionserver.tableindexed.IndexMaintenanceUtils;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -39,6 +49,8 @@ import org.apache.hadoop.hbase.util.Bytes;
|
||||||
*/
|
*/
|
||||||
public class IndexedTableAdmin extends HBaseAdmin {
|
public class IndexedTableAdmin extends HBaseAdmin {
|
||||||
|
|
||||||
|
private static final Log LOG = LogFactory.getLog(IndexedTableAdmin.class);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor
|
* Constructor
|
||||||
*
|
*
|
||||||
|
@ -94,4 +106,49 @@ public class IndexedTableAdmin extends HBaseAdmin {
|
||||||
|
|
||||||
return indexTableDesc;
|
return indexTableDesc;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Remove an index for a table.
|
||||||
|
* @throws IOException
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public void removeIndex(byte[] baseTableName, String indexId) throws IOException {
|
||||||
|
super.disableTable(baseTableName);
|
||||||
|
HTableDescriptor desc = super.getTableDescriptor(baseTableName);
|
||||||
|
IndexSpecification spec = desc.getIndex(indexId);
|
||||||
|
desc.removeIndex(indexId);
|
||||||
|
this.disableTable(spec.getIndexedTableName(baseTableName));
|
||||||
|
this.deleteTable(spec.getIndexedTableName(baseTableName));
|
||||||
|
super.modifyTable(baseTableName, desc);
|
||||||
|
super.enableTable(baseTableName);
|
||||||
|
}
|
||||||
|
|
||||||
|
/** Add an index to a table. */
|
||||||
|
public void addIndex(byte []baseTableName, IndexSpecification indexSpec) throws IOException {
|
||||||
|
LOG.warn("Adding index to existing table ["+Bytes.toString(baseTableName)+"], this may take a long time");
|
||||||
|
// TODO, make table read-only
|
||||||
|
LOG.warn("Not putting table in readonly, if its being written to, the index may get out of sync");
|
||||||
|
HTableDescriptor indexTableDesc = createIndexTableDesc(baseTableName, indexSpec);
|
||||||
|
super.createTable(indexTableDesc);
|
||||||
|
super.disableTable(baseTableName);
|
||||||
|
HTableDescriptor desc = super.getTableDescriptor(baseTableName);
|
||||||
|
desc.addIndex(indexSpec);
|
||||||
|
super.modifyTable(baseTableName, desc);
|
||||||
|
super.enableTable(baseTableName);
|
||||||
|
reIndexTable(baseTableName, indexSpec);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void reIndexTable(byte[] baseTableName, IndexSpecification indexSpec) throws IOException {
|
||||||
|
HTable baseTable = new HTable(baseTableName);
|
||||||
|
HTable indexTable = new HTable(indexSpec.getIndexedTableName(baseTableName));
|
||||||
|
for (RowResult rowResult : baseTable.getScanner(indexSpec.getAllColumns())) {
|
||||||
|
SortedMap<byte[], byte[]> columnValues = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
|
||||||
|
for (Entry<byte[], Cell> entry : rowResult.entrySet()) {
|
||||||
|
columnValues.put(entry.getKey(), entry.getValue().getValue());
|
||||||
|
}
|
||||||
|
if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec, columnValues)) {
|
||||||
|
BatchUpdate indexUpdate = IndexMaintenanceUtils.createIndexUpdate(indexSpec, rowResult.getRow(), columnValues);
|
||||||
|
indexTable.commit(indexUpdate);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -141,7 +141,7 @@ class IndexedRegion extends TransactionalRegion {
|
||||||
Iterator<IndexSpecification> indexIterator = indexesToUpdate.iterator();
|
Iterator<IndexSpecification> indexIterator = indexesToUpdate.iterator();
|
||||||
while (indexIterator.hasNext()) {
|
while (indexIterator.hasNext()) {
|
||||||
IndexSpecification indexSpec = indexIterator.next();
|
IndexSpecification indexSpec = indexIterator.next();
|
||||||
if (!doesApplyToIndex(indexSpec, newColumnValues)) {
|
if (!IndexMaintenanceUtils.doesApplyToIndex(indexSpec, newColumnValues)) {
|
||||||
indexIterator.remove();
|
indexIterator.remove();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -210,60 +210,19 @@ class IndexedRegion extends TransactionalRegion {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Ask if this update does apply to the index.
|
// FIXME: This call takes place in an RPC, and requires an RPC. This makes for
|
||||||
*
|
// a likely deadlock if the number of RPCs we are trying to serve is >= the
|
||||||
* @param indexSpec
|
// number of handler threads.
|
||||||
* @param b
|
|
||||||
* @return true if possibly apply.
|
|
||||||
*/
|
|
||||||
private boolean doesApplyToIndex(IndexSpecification indexSpec, SortedMap<byte[], byte[]> columnValues) {
|
|
||||||
|
|
||||||
for (byte [] neededCol : indexSpec.getIndexedColumns()) {
|
|
||||||
if (! columnValues.containsKey(neededCol)) {
|
|
||||||
LOG.debug("Index [" + indexSpec.getIndexId() + "] can't be updated because ["
|
|
||||||
+ Bytes.toString(neededCol) + "] is missing");
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void updateIndex(IndexSpecification indexSpec, byte[] row,
|
private void updateIndex(IndexSpecification indexSpec, byte[] row,
|
||||||
SortedMap<byte[], byte[]> columnValues) throws IOException {
|
SortedMap<byte[], byte[]> columnValues) throws IOException {
|
||||||
BatchUpdate indexUpdate = createIndexUpdate(indexSpec, row, columnValues);
|
BatchUpdate indexUpdate = IndexMaintenanceUtils.createIndexUpdate(indexSpec, row, columnValues);
|
||||||
getIndexTable(indexSpec).commit(indexUpdate);
|
getIndexTable(indexSpec).commit(indexUpdate); // FIXME, this is synchronized
|
||||||
LOG.debug("Index [" + indexSpec.getIndexId() + "] adding new entry ["
|
LOG.debug("Index [" + indexSpec.getIndexId() + "] adding new entry ["
|
||||||
+ Bytes.toString(indexUpdate.getRow()) + "] for row ["
|
+ Bytes.toString(indexUpdate.getRow()) + "] for row ["
|
||||||
+ Bytes.toString(row) + "]");
|
+ Bytes.toString(row) + "]");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private BatchUpdate createIndexUpdate(IndexSpecification indexSpec,
|
|
||||||
byte[] row, SortedMap<byte[], byte[]> columnValues) {
|
|
||||||
byte[] indexRow = indexSpec.getKeyGenerator().createIndexKey(row,
|
|
||||||
columnValues);
|
|
||||||
BatchUpdate update = new BatchUpdate(indexRow);
|
|
||||||
|
|
||||||
update.put(IndexedTable.INDEX_BASE_ROW_COLUMN, row);
|
|
||||||
|
|
||||||
for (byte[] col : indexSpec.getIndexedColumns()) {
|
|
||||||
byte[] val = columnValues.get(col);
|
|
||||||
if (val == null) {
|
|
||||||
throw new RuntimeException("Unexpected missing column value. ["+Bytes.toString(col)+"]");
|
|
||||||
}
|
|
||||||
update.put(col, val);
|
|
||||||
}
|
|
||||||
|
|
||||||
for (byte [] col : indexSpec.getAdditionalColumns()) {
|
|
||||||
byte[] val = columnValues.get(col);
|
|
||||||
if (val != null) {
|
|
||||||
update.put(col, val);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return update;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void deleteAll(final byte[] row, final long ts, final Integer lockid)
|
public void deleteAll(final byte[] row, final long ts, final Integer lockid)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
|
@ -289,7 +248,7 @@ class IndexedRegion extends TransactionalRegion {
|
||||||
SortedMap<byte[], byte[]> currentColumnValues = convertToValueMap(currentColumnCells);
|
SortedMap<byte[], byte[]> currentColumnValues = convertToValueMap(currentColumnCells);
|
||||||
|
|
||||||
for (IndexSpecification indexSpec : getIndexes()) {
|
for (IndexSpecification indexSpec : getIndexes()) {
|
||||||
if (doesApplyToIndex(indexSpec, currentColumnValues)) {
|
if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec, currentColumnValues)) {
|
||||||
updateIndex(indexSpec, row, currentColumnValues);
|
updateIndex(indexSpec, row, currentColumnValues);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -334,7 +293,7 @@ class IndexedRegion extends TransactionalRegion {
|
||||||
SortedMap<byte[], byte[]> currentColumnValues = convertToValueMap(currentColumnCells);
|
SortedMap<byte[], byte[]> currentColumnValues = convertToValueMap(currentColumnCells);
|
||||||
|
|
||||||
for (IndexSpecification indexSpec : getIndexes()) {
|
for (IndexSpecification indexSpec : getIndexes()) {
|
||||||
if (doesApplyToIndex(indexSpec, currentColumnValues)) {
|
if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec, currentColumnValues)) {
|
||||||
updateIndex(indexSpec, row, currentColumnValues);
|
updateIndex(indexSpec, row, currentColumnValues);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue