HBASE-883 Secondary indexes
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@718317 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
f759be8c0d
commit
fc56f9ca0d
|
@ -127,6 +127,7 @@ Release 0.19.0 - Unreleased
|
|||
HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters]
|
||||
(Andrzej Bialecki via Stack)
|
||||
HBASE-625 Metrics support for cluster load history: emissions and graphs
|
||||
HBASE-883 Secondary indexes (Clint Morgan via Andrew Purtell)
|
||||
|
||||
OPTIMIZATIONS
|
||||
HBASE-748 Add an efficient way to batch update many rows
|
||||
|
|
|
@ -530,6 +530,9 @@ public class HStoreKey implements WritableComparable<HStoreKey> {
|
|||
rowCompare = Bytes.compareTo(keysA[1], KeysB[1]);
|
||||
return rowCompare;
|
||||
}
|
||||
if (regionInfo != null && regionInfo.getTableDesc().getRowKeyComparator() != null) {
|
||||
return regionInfo.getTableDesc().getRowKeyComparator().compare(rowA, rowB);
|
||||
}
|
||||
return Bytes.compareTo(rowA, rowB);
|
||||
}
|
||||
|
||||
|
|
|
@ -19,8 +19,12 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInput;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.DataOutput;
|
||||
import java.io.DataOutputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
|
@ -29,8 +33,10 @@ import java.util.Iterator;
|
|||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification;
|
||||
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.ObjectWritable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
|
||||
/**
|
||||
|
@ -41,7 +47,8 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
|
|||
|
||||
// Changes prior to version 3 were not recorded here.
|
||||
// Version 3 adds metadata as a map where keys and values are byte[].
|
||||
public static final byte TABLE_DESCRIPTOR_VERSION = 3;
|
||||
// Version 4 adds indexes
|
||||
public static final byte TABLE_DESCRIPTOR_VERSION = 4;
|
||||
|
||||
private byte [] name = HConstants.EMPTY_BYTE_ARRAY;
|
||||
private String nameAsString = "";
|
||||
|
@ -66,9 +73,13 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
|
|||
public static final ImmutableBytesWritable IS_ROOT_KEY =
|
||||
new ImmutableBytesWritable(Bytes.toBytes(IS_ROOT));
|
||||
public static final String IS_META = "IS_META";
|
||||
|
||||
public static final String ROW_KEY_COMPARATOR = "ROW_KEY_COMPARATOR";
|
||||
|
||||
public static final ImmutableBytesWritable IS_META_KEY =
|
||||
new ImmutableBytesWritable(Bytes.toBytes(IS_META));
|
||||
|
||||
|
||||
// The below are ugly but better than creating them each time till we
|
||||
// replace booleans being saved as Strings with plain booleans. Need a
|
||||
// migration script to do this. TODO.
|
||||
|
@ -90,6 +101,10 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
|
|||
private final Map<Integer, HColumnDescriptor> families =
|
||||
new HashMap<Integer, HColumnDescriptor>();
|
||||
|
||||
// Key is indexId
|
||||
private final Map<String, IndexSpecification> indexes =
|
||||
new HashMap<String, IndexSpecification>();
|
||||
|
||||
/**
|
||||
* Private constructor used internally creating table descriptors for
|
||||
* catalog tables: e.g. .META. and -ROOT-.
|
||||
|
@ -108,6 +123,7 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
|
|||
* catalog tables: e.g. .META. and -ROOT-.
|
||||
*/
|
||||
protected HTableDescriptor(final byte [] name, HColumnDescriptor[] families,
|
||||
Collection<IndexSpecification> indexes,
|
||||
Map<ImmutableBytesWritable,ImmutableBytesWritable> values) {
|
||||
this.name = name.clone();
|
||||
this.nameAsString = Bytes.toString(this.name);
|
||||
|
@ -115,6 +131,9 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
|
|||
for(HColumnDescriptor descriptor : families) {
|
||||
this.families.put(Bytes.mapKey(descriptor.getName()), descriptor);
|
||||
}
|
||||
for(IndexSpecification index : indexes) {
|
||||
this.indexes.put(index.getIndexId(), index);
|
||||
}
|
||||
for (Map.Entry<ImmutableBytesWritable, ImmutableBytesWritable> entry:
|
||||
values.entrySet()) {
|
||||
this.values.put(entry.getKey(), entry.getValue());
|
||||
|
@ -413,6 +432,58 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
|
|||
Bytes.toBytes(Integer.toString(memcacheFlushSize)));
|
||||
}
|
||||
|
||||
|
||||
public void setRowKeyComparator(WritableComparator<byte[]> newComparator) {
|
||||
if (newComparator == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
ByteArrayOutputStream bos = new ByteArrayOutputStream();
|
||||
DataOutputStream dos = new DataOutputStream(bos);
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
try {
|
||||
ObjectWritable.writeObject(dos, newComparator, WritableComparator.class, conf);
|
||||
dos.close();
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
setValue(ROW_KEY_COMPARATOR.getBytes(), bos.toByteArray());
|
||||
this.comparator = newComparator;
|
||||
}
|
||||
|
||||
private WritableComparator<byte[]> comparator = null;
|
||||
public WritableComparator<byte[]> getRowKeyComparator() {
|
||||
if (comparator != null) {
|
||||
return comparator;
|
||||
}
|
||||
byte[] bytes = getValue(ROW_KEY_COMPARATOR.getBytes());
|
||||
if (bytes == null) {
|
||||
return null;
|
||||
}
|
||||
ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
|
||||
DataInputStream in = new DataInputStream(bis);
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
try {
|
||||
comparator = (WritableComparator<byte[]>) ObjectWritable.readObject(in, conf);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
return comparator;
|
||||
}
|
||||
|
||||
|
||||
public Collection<IndexSpecification> getIndexes() {
|
||||
return indexes.values();
|
||||
}
|
||||
|
||||
public IndexSpecification getIndex(String indexId) {
|
||||
return indexes.get(indexId);
|
||||
}
|
||||
|
||||
public void addIndex(IndexSpecification index) {
|
||||
indexes.put(index.getIndexId(), index);
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a column family.
|
||||
* @param family HColumnDescriptor of familyto add.
|
||||
|
@ -519,6 +590,16 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
|
|||
c.readFields(in);
|
||||
families.put(Bytes.mapKey(c.getName()), c);
|
||||
}
|
||||
indexes.clear();
|
||||
if (version < 4) {
|
||||
return;
|
||||
}
|
||||
int numIndexes = in.readInt();
|
||||
for (int i = 0; i < numIndexes; i++) {
|
||||
IndexSpecification index = new IndexSpecification();
|
||||
index.readFields(in);
|
||||
addIndex(index);
|
||||
}
|
||||
}
|
||||
|
||||
public void write(DataOutput out) throws IOException {
|
||||
|
@ -538,6 +619,10 @@ public class HTableDescriptor implements WritableComparable<HTableDescriptor> {
|
|||
HColumnDescriptor family = it.next();
|
||||
family.write(out);
|
||||
}
|
||||
out.writeInt(indexes.size());
|
||||
for(IndexSpecification index : indexes.values()) {
|
||||
index.write(out);
|
||||
}
|
||||
}
|
||||
|
||||
// Comparable
|
||||
|
|
|
@ -0,0 +1,28 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase;
|
||||
|
||||
import java.util.Comparator;
|
||||
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
public interface WritableComparator<T> extends Writable, Comparator<T> {
|
||||
// No methods, just bring the two interfaces together
|
||||
}
|
|
@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.client;
|
|||
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification;
|
||||
|
||||
/**
|
||||
* Read-only table descriptor.
|
||||
|
@ -37,7 +38,7 @@ public class UnmodifyableHTableDescriptor extends HTableDescriptor {
|
|||
* @param desc
|
||||
*/
|
||||
UnmodifyableHTableDescriptor(final HTableDescriptor desc) {
|
||||
super(desc.getName(), getUnmodifyableFamilies(desc), desc.getValues());
|
||||
super(desc.getName(), getUnmodifyableFamilies(desc), desc.getIndexes(), desc.getValues());
|
||||
}
|
||||
|
||||
/*
|
||||
|
@ -108,4 +109,9 @@ public class UnmodifyableHTableDescriptor extends HTableDescriptor {
|
|||
public void setMemcacheFlushSize(int memcacheFlushSize) {
|
||||
throw new UnsupportedOperationException("HTableDescriptor is read-only");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void addIndex(IndexSpecification index) {
|
||||
throw new UnsupportedOperationException("HTableDescriptor is read-only");
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,29 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.tableindexed;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.io.Writable;
|
||||
|
||||
public interface IndexKeyGenerator extends Writable {
|
||||
|
||||
byte [] createIndexKey(byte [] rowKey, Map<byte [], byte []> columns);
|
||||
}
|
|
@ -0,0 +1,45 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.tableindexed;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
/**
|
||||
* Thrown when asking for an index that does not exist.
|
||||
*/
|
||||
public class IndexNotFoundException extends IOException {
|
||||
|
||||
public IndexNotFoundException() {
|
||||
super();
|
||||
}
|
||||
|
||||
public IndexNotFoundException(String arg0) {
|
||||
super(arg0);
|
||||
}
|
||||
|
||||
public IndexNotFoundException(Throwable arg0) {
|
||||
super(arg0.getMessage());
|
||||
}
|
||||
|
||||
public IndexNotFoundException(String arg0, Throwable arg1) {
|
||||
super(arg0+arg1.getMessage());
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,200 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.tableindexed;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.WritableComparator;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.io.ObjectWritable;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableComparable;
|
||||
|
||||
/** Holds the specification for a single secondary index. */
|
||||
public class IndexSpecification implements Writable {
|
||||
|
||||
// Columns that are indexed (part of the indexRowKey)
|
||||
private byte[][] indexedColumns;
|
||||
|
||||
// Constructs the
|
||||
private IndexKeyGenerator keyGenerator;
|
||||
|
||||
private WritableComparator<byte[]> keyComparator;
|
||||
|
||||
// Additional columns mapped into the indexed row. These will be available for
|
||||
// filters when scanning the index.
|
||||
private byte[][] additionalColumns;
|
||||
|
||||
private byte[][] allColumns;
|
||||
|
||||
// Id of this index, unique within a table.
|
||||
private String indexId;
|
||||
|
||||
/** Construct an "simple" index spec for a single column. */
|
||||
public IndexSpecification(String indexId, byte[] indexedColumn,
|
||||
boolean acending) {
|
||||
this(indexId, new byte[][] { indexedColumn }, null,
|
||||
new SimpleIndexKeyGenerator(indexedColumn), acending == true ? null
|
||||
: new ReverseByteArrayComparator());
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct an index spec by specifying everything.
|
||||
*
|
||||
* @param indexId
|
||||
* @param indexedColumns
|
||||
* @param additionalColumns
|
||||
* @param keyGenerator
|
||||
* @param keyComparator
|
||||
*/
|
||||
public IndexSpecification(String indexId, byte[][] indexedColumns,
|
||||
byte[][] additionalColumns, IndexKeyGenerator keyGenerator,
|
||||
WritableComparator<byte[]> keyComparator) {
|
||||
this.indexId = indexId;
|
||||
this.indexedColumns = indexedColumns;
|
||||
this.additionalColumns = additionalColumns;
|
||||
this.keyGenerator = keyGenerator;
|
||||
this.keyComparator = keyComparator;
|
||||
this.makeAllColumns();
|
||||
}
|
||||
|
||||
public IndexSpecification() {
|
||||
// For writable
|
||||
}
|
||||
|
||||
private void makeAllColumns() {
|
||||
this.allColumns = new byte[indexedColumns.length
|
||||
+ (additionalColumns == null ? 0 : additionalColumns.length)][];
|
||||
System.arraycopy(indexedColumns, 0, allColumns, 0, indexedColumns.length);
|
||||
if (additionalColumns != null) {
|
||||
System.arraycopy(additionalColumns, 0, allColumns, indexedColumns.length,
|
||||
additionalColumns.length);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the indexedColumns.
|
||||
*
|
||||
* @return Return the indexedColumns.
|
||||
*/
|
||||
public byte[][] getIndexedColumns() {
|
||||
return indexedColumns;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the keyGenerator.
|
||||
*
|
||||
* @return Return the keyGenerator.
|
||||
*/
|
||||
public IndexKeyGenerator getKeyGenerator() {
|
||||
return keyGenerator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the keyComparator.
|
||||
*
|
||||
* @return Return the keyComparator.
|
||||
*/
|
||||
public WritableComparator<byte[]> getKeyComparator() {
|
||||
return keyComparator;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the additionalColumns.
|
||||
*
|
||||
* @return Return the additionalColumns.
|
||||
*/
|
||||
public byte[][] getAdditionalColumns() {
|
||||
return additionalColumns;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the indexId.
|
||||
*
|
||||
* @return Return the indexId.
|
||||
*/
|
||||
public String getIndexId() {
|
||||
return indexId;
|
||||
}
|
||||
|
||||
public byte[][] getAllColumns() {
|
||||
return allColumns;
|
||||
}
|
||||
|
||||
public boolean containsColumn(byte[] column) {
|
||||
for (byte[] col : allColumns) {
|
||||
if (Bytes.equals(column, col)) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
public byte[] getIndexedTableName(byte[] baseTableName) {
|
||||
return Bytes.add(baseTableName, Bytes.toBytes("-" + indexId));
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
@SuppressWarnings("unchecked")
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
indexId = in.readUTF();
|
||||
int numIndexedCols = in.readInt();
|
||||
indexedColumns = new byte[numIndexedCols][];
|
||||
for (int i = 0; i < numIndexedCols; i++) {
|
||||
indexedColumns[i] = Bytes.readByteArray(in);
|
||||
}
|
||||
int numAdditionalCols = in.readInt();
|
||||
additionalColumns = new byte[numAdditionalCols][];
|
||||
for (int i = 0; i < numAdditionalCols; i++) {
|
||||
additionalColumns[i] = Bytes.readByteArray(in);
|
||||
}
|
||||
makeAllColumns();
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
keyGenerator = (IndexKeyGenerator) ObjectWritable.readObject(in, conf);
|
||||
keyComparator = (WritableComparator<byte[]>) ObjectWritable.readObject(in,
|
||||
conf);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void write(DataOutput out) throws IOException {
|
||||
out.writeUTF(indexId);
|
||||
out.writeInt(indexedColumns.length);
|
||||
for (byte[] col : indexedColumns) {
|
||||
Bytes.writeByteArray(out, col);
|
||||
}
|
||||
if (additionalColumns != null) {
|
||||
out.writeInt(additionalColumns.length);
|
||||
for (byte[] col : additionalColumns) {
|
||||
Bytes.writeByteArray(out, col);
|
||||
}
|
||||
} else {
|
||||
out.writeInt(0);
|
||||
}
|
||||
HBaseConfiguration conf = new HBaseConfiguration();
|
||||
ObjectWritable
|
||||
.writeObject(out, keyGenerator, IndexKeyGenerator.class, conf);
|
||||
ObjectWritable.writeObject(out, keyComparator, WritableComparable.class,
|
||||
conf);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,222 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.tableindexed;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.Map;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.Scanner;
|
||||
import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
|
||||
import org.apache.hadoop.hbase.filter.RowFilterInterface;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.hbase.io.HbaseMapWritable;
|
||||
import org.apache.hadoop.hbase.io.RowResult;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/** HTable extended with indexed support. */
|
||||
public class IndexedTable extends TransactionalTable {
|
||||
|
||||
// FIXME, these belong elsewhere
|
||||
public static final byte[] INDEX_COL_FAMILY_NAME = Bytes.toBytes("__INDEX__");
|
||||
public static final byte[] INDEX_COL_FAMILY = Bytes.add(
|
||||
INDEX_COL_FAMILY_NAME, new byte[] { HStoreKey.COLUMN_FAMILY_DELIMITER });
|
||||
public static final byte[] INDEX_BASE_ROW_COLUMN = Bytes.add(
|
||||
INDEX_COL_FAMILY, Bytes.toBytes("ROW"));
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(IndexedTable.class);
|
||||
|
||||
private Map<String, HTable> indexIdToTable = new HashMap<String, HTable>();
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public IndexedTable(final HBaseConfiguration conf, final byte[] tableName)
|
||||
throws IOException {
|
||||
super(conf, tableName);
|
||||
|
||||
for (IndexSpecification spec : super.getTableDescriptor().getIndexes()) {
|
||||
indexIdToTable.put(spec.getIndexId(), new HTable(conf, spec
|
||||
.getIndexedTableName(tableName)));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Open up an indexed scanner. Results will come back in the indexed order,
|
||||
* but will contain RowResults from the original table.
|
||||
*
|
||||
* @param indexId the id of the index to use
|
||||
* @param indexStartRow (created from the IndexKeyGenerator)
|
||||
* @param indexColumns in the index table
|
||||
* @param indexFilter filter to run on the index'ed table. This can only use
|
||||
* columns that have been added to the index.
|
||||
* @param baseColumns from the original table
|
||||
* @return scanner
|
||||
* @throws IOException
|
||||
* @throws IndexNotFoundException
|
||||
*/
|
||||
public Scanner getIndexedScanner(String indexId, final byte[] indexStartRow,
|
||||
byte[][] indexColumns, final RowFilterInterface indexFilter,
|
||||
final byte[][] baseColumns) throws IOException, IndexNotFoundException {
|
||||
IndexSpecification indexSpec = super.getTableDescriptor().getIndex(indexId);
|
||||
if (indexSpec == null) {
|
||||
throw new IndexNotFoundException("Index " + indexId
|
||||
+ " not defined in table "
|
||||
+ super.getTableDescriptor().getNameAsString());
|
||||
}
|
||||
verifyIndexColumns(indexColumns, indexSpec);
|
||||
// TODO, verify/remove index columns from baseColumns
|
||||
|
||||
HTable indexTable = indexIdToTable.get(indexId);
|
||||
|
||||
byte[][] allIndexColumns;
|
||||
if (indexColumns != null) {
|
||||
allIndexColumns = new byte[indexColumns.length + 1][];
|
||||
System
|
||||
.arraycopy(indexColumns, 0, allIndexColumns, 0, indexColumns.length);
|
||||
allIndexColumns[indexColumns.length] = INDEX_BASE_ROW_COLUMN;
|
||||
} else {
|
||||
byte[][] allColumns = indexSpec.getAllColumns();
|
||||
allIndexColumns = new byte[allColumns.length + 1][];
|
||||
System.arraycopy(allColumns, 0, allIndexColumns, 0, allColumns.length);
|
||||
allIndexColumns[allColumns.length] = INDEX_BASE_ROW_COLUMN;
|
||||
}
|
||||
|
||||
Scanner indexScanner = indexTable.getScanner(allIndexColumns,
|
||||
indexStartRow, indexFilter);
|
||||
|
||||
return new ScannerWrapper(indexScanner, baseColumns);
|
||||
}
|
||||
|
||||
private void verifyIndexColumns(byte[][] requestedColumns,
|
||||
IndexSpecification indexSpec) {
|
||||
if (requestedColumns == null) {
|
||||
return;
|
||||
}
|
||||
for (byte[] requestedColumn : requestedColumns) {
|
||||
boolean found = false;
|
||||
for (byte[] indexColumn : indexSpec.getAllColumns()) {
|
||||
if (Bytes.equals(requestedColumn, indexColumn)) {
|
||||
found = true;
|
||||
break;
|
||||
}
|
||||
}
|
||||
if (!found) {
|
||||
throw new RuntimeException("Column [" + Bytes.toString(requestedColumn)
|
||||
+ "] not in index " + indexSpec.getIndexId());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class ScannerWrapper implements Scanner {
|
||||
|
||||
private Scanner indexScanner;
|
||||
private byte[][] columns;
|
||||
|
||||
public ScannerWrapper(Scanner indexScanner, byte[][] columns) {
|
||||
this.indexScanner = indexScanner;
|
||||
this.columns = columns;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public RowResult next() throws IOException {
|
||||
RowResult[] result = next(1);
|
||||
if (result == null || result.length < 1)
|
||||
return null;
|
||||
return result[0];
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public RowResult[] next(int nbRows) throws IOException {
|
||||
RowResult[] indexResult = indexScanner.next(nbRows);
|
||||
if (indexResult == null) {
|
||||
return null;
|
||||
}
|
||||
RowResult[] result = new RowResult[indexResult.length];
|
||||
for (int i = 0; i < indexResult.length; i++) {
|
||||
RowResult row = indexResult[i];
|
||||
byte[] baseRow = row.get(INDEX_BASE_ROW_COLUMN).getValue();
|
||||
LOG.debug("next index row [" + Bytes.toString(row.getRow())
|
||||
+ "] -> base row [" + Bytes.toString(baseRow) + "]");
|
||||
HbaseMapWritable<byte[], Cell> colValues =
|
||||
new HbaseMapWritable<byte[], Cell>();
|
||||
if (columns != null && columns.length > 0) {
|
||||
LOG.debug("Going to base table for remaining columns");
|
||||
RowResult baseResult = IndexedTable.this.getRow(baseRow, columns);
|
||||
colValues.putAll(baseResult);
|
||||
}
|
||||
for (Entry<byte[], Cell> entry : row.entrySet()) {
|
||||
byte[] col = entry.getKey();
|
||||
if (HStoreKey.matchingFamily(INDEX_COL_FAMILY_NAME, col)) {
|
||||
continue;
|
||||
}
|
||||
colValues.put(col, entry.getValue());
|
||||
}
|
||||
result[i] = new RowResult(baseRow, colValues);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void close() {
|
||||
indexScanner.close();
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public Iterator<RowResult> iterator() {
|
||||
// FIXME, copied from HTable.ClientScanner. Extract this to common base
|
||||
// class?
|
||||
return new Iterator<RowResult>() {
|
||||
RowResult next = null;
|
||||
|
||||
public boolean hasNext() {
|
||||
if (next == null) {
|
||||
try {
|
||||
next = ScannerWrapper.this.next();
|
||||
return next != null;
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
public RowResult next() {
|
||||
if (!hasNext()) {
|
||||
return null;
|
||||
}
|
||||
RowResult temp = next;
|
||||
next = null;
|
||||
return temp;
|
||||
}
|
||||
|
||||
public void remove() {
|
||||
throw new UnsupportedOperationException();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
}
|
||||
}
|
|
@ -0,0 +1,99 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.tableindexed;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Set;
|
||||
import java.util.TreeSet;
|
||||
|
||||
import org.apache.hadoop.hbase.ColumnNameParseException;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HStoreKey;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.MasterNotRunningException;
|
||||
import org.apache.hadoop.hbase.TableExistsException;
|
||||
import org.apache.hadoop.hbase.client.HBaseAdmin;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/**
|
||||
* Extension of HBaseAdmin that creates indexed tables.
|
||||
*
|
||||
*/
|
||||
public class IndexedTableAdmin extends HBaseAdmin {
|
||||
|
||||
/**
|
||||
* Constructor
|
||||
*
|
||||
* @param conf Configuration object
|
||||
* @throws MasterNotRunningException
|
||||
*/
|
||||
public IndexedTableAdmin(HBaseConfiguration conf)
|
||||
throws MasterNotRunningException {
|
||||
super(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new table
|
||||
*
|
||||
* @param desc table descriptor for table
|
||||
*
|
||||
* @throws IllegalArgumentException if the table name is reserved
|
||||
* @throws MasterNotRunningException if master is not running
|
||||
* @throws TableExistsException if table already exists (If concurrent
|
||||
* threads, the table may have been created between test-for-existence and
|
||||
* attempt-at-creation).
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void createTable(HTableDescriptor desc) throws IOException {
|
||||
super.createTable(desc);
|
||||
this.createIndexTables(desc);
|
||||
}
|
||||
|
||||
private void createIndexTables(HTableDescriptor tableDesc) throws IOException {
|
||||
byte[] baseTableName = tableDesc.getName();
|
||||
for (IndexSpecification indexSpec : tableDesc.getIndexes()) {
|
||||
HTableDescriptor indexTableDesc = createIndexTableDesc(baseTableName,
|
||||
indexSpec);
|
||||
super.createTable(indexTableDesc);
|
||||
}
|
||||
}
|
||||
|
||||
private HTableDescriptor createIndexTableDesc(byte[] baseTableName,
|
||||
IndexSpecification indexSpec) throws ColumnNameParseException {
|
||||
HTableDescriptor indexTableDesc = new HTableDescriptor(indexSpec
|
||||
.getIndexedTableName(baseTableName));
|
||||
Set<byte[]> families = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
||||
families.add(IndexedTable.INDEX_COL_FAMILY);
|
||||
for (byte[] column : indexSpec.getAllColumns()) {
|
||||
families.add(Bytes.add(HStoreKey.getFamily(column),
|
||||
new byte[] { HStoreKey.COLUMN_FAMILY_DELIMITER }));
|
||||
}
|
||||
|
||||
for (byte[] colFamily : families) {
|
||||
indexTableDesc.addFamily(new HColumnDescriptor(colFamily));
|
||||
}
|
||||
|
||||
indexTableDesc.setRowKeyComparator(indexSpec.getKeyComparator());
|
||||
|
||||
return indexTableDesc;
|
||||
}
|
||||
}
|
|
@ -0,0 +1,46 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.tableindexed;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.WritableComparator;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
public class ReverseByteArrayComparator implements WritableComparator<byte[]> {
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public int compare(byte[] o1, byte[] o2) {
|
||||
return Bytes.compareTo(o2, o1);
|
||||
}
|
||||
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void readFields(DataInput arg0) throws IOException {
|
||||
// Nothing
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void write(DataOutput arg0) throws IOException {
|
||||
// Nothing
|
||||
}
|
||||
}
|
|
@ -0,0 +1,59 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.tableindexed;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
/** Creates indexed keys for a single column....
|
||||
*
|
||||
*/
|
||||
public class SimpleIndexKeyGenerator implements IndexKeyGenerator {
|
||||
|
||||
private byte [] column;
|
||||
|
||||
public SimpleIndexKeyGenerator(byte [] column) {
|
||||
this.column = column;
|
||||
}
|
||||
|
||||
public SimpleIndexKeyGenerator() {
|
||||
// For Writable
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public byte[] createIndexKey(byte[] rowKey, Map<byte[], byte[]> columns) {
|
||||
return Bytes.add(columns.get(column), rowKey);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void readFields(DataInput in) throws IOException {
|
||||
column = Bytes.readByteArray(in);
|
||||
}
|
||||
|
||||
/** {@inheritDoc} */
|
||||
public void write(DataOutput out) throws IOException {
|
||||
Bytes.writeByteArray(out, column);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,47 @@
|
|||
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
|
||||
<html>
|
||||
|
||||
<!--
|
||||
Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
contributor license agreements. See the NOTICE file distributed with
|
||||
this work for additional information regarding copyright ownership.
|
||||
The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
(the "License"); you may not use this file except in compliance with
|
||||
the License. You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
-->
|
||||
|
||||
<head />
|
||||
<body bgcolor="white">
|
||||
|
||||
This package provides support for secondary indexing by maintaining a separate, "index", table for each index.
|
||||
|
||||
The IndexSpecification class provides the metadata for the index. This includes:
|
||||
<li> the columns that contribute to the index key,
|
||||
<li> additional columns to put in the index table (and are thus made available to filters on the index table),
|
||||
<li> an IndexKeyGenerator which constructs the index-row-key from the indexed column(s) and the original row,
|
||||
<br> and
|
||||
<li> (optionally) a custom key comparator for the indexed table. This can allow an index on a deserialized column value.
|
||||
|
||||
IndexesSpecifications can be added to a table's metadata (HTableDescriptor) before the table is constructed.
|
||||
Afterwards, updates and deletes to the original table will trigger the updates in the index, and
|
||||
the indexes can be scanned using the API on IndexedTable.
|
||||
|
||||
For a simple example, look at the unit test in org.apache.hadoop.hbase.client.tableIndexed.
|
||||
|
||||
<p> To enable the indexing, modify hbase-site.xml to turn on the
|
||||
IndexedRegionServer. This is done by setting
|
||||
<i>hbase.regionserver.class</i> to
|
||||
<i>org.apache.hadoop.hbase.ipc.IndexedRegionInterface</i> and
|
||||
<i>hbase.regionserver.impl </i> to
|
||||
<i>org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer</i>
|
||||
|
||||
</body>
|
||||
</html>
|
|
@ -0,0 +1,11 @@
|
|||
/*
|
||||
* $Id$
|
||||
* Created on Sep 10, 2008
|
||||
*
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
/** Interface for the indexed region server. */
|
||||
public interface IndexedRegionInterface extends TransactionalRegionInterface {
|
||||
// No methods for now...
|
||||
}
|
|
@ -0,0 +1,342 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.tableindexed;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Collection;
|
||||
import java.util.HashMap;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.SortedMap;
|
||||
import java.util.TreeMap;
|
||||
import java.util.TreeSet;
|
||||
import java.util.Map.Entry;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.client.HTable;
|
||||
import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification;
|
||||
import org.apache.hadoop.hbase.client.tableindexed.IndexedTable;
|
||||
import org.apache.hadoop.hbase.io.BatchOperation;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.io.Cell;
|
||||
import org.apache.hadoop.hbase.regionserver.FlushRequester;
|
||||
import org.apache.hadoop.hbase.regionserver.HLog;
|
||||
import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
class IndexedRegion extends TransactionalRegion {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(IndexedRegion.class);
|
||||
|
||||
private final HBaseConfiguration conf;
|
||||
private Map<IndexSpecification, HTable> indexSpecToTable = new HashMap<IndexSpecification, HTable>();
|
||||
|
||||
public IndexedRegion(final Path basedir, final HLog log, final FileSystem fs,
|
||||
final HBaseConfiguration conf, final HRegionInfo regionInfo,
|
||||
final FlushRequester flushListener) {
|
||||
super(basedir, log, fs, conf, regionInfo, flushListener);
|
||||
this.conf = conf;
|
||||
}
|
||||
|
||||
private synchronized HTable getIndexTable(IndexSpecification index)
|
||||
throws IOException {
|
||||
HTable indexTable = indexSpecToTable.get(index);
|
||||
if (indexTable == null) {
|
||||
indexTable = new HTable(conf, index.getIndexedTableName(super
|
||||
.getRegionInfo().getTableDesc().getName()));
|
||||
indexSpecToTable.put(index, indexTable);
|
||||
}
|
||||
return indexTable;
|
||||
}
|
||||
|
||||
private Collection<IndexSpecification> getIndexes() {
|
||||
return super.getRegionInfo().getTableDesc().getIndexes();
|
||||
}
|
||||
|
||||
/**
|
||||
* @param batchUpdate
|
||||
* @param lockid
|
||||
* @param writeToWAL if true, then we write this update to the log
|
||||
* @throws IOException
|
||||
*/
|
||||
@Override
|
||||
public void batchUpdate(BatchUpdate batchUpdate, Integer lockid, boolean writeToWAL)
|
||||
throws IOException {
|
||||
updateIndexes(batchUpdate); // Do this first because will want to see the old row
|
||||
super.batchUpdate(batchUpdate, lockid, writeToWAL);
|
||||
}
|
||||
|
||||
private void updateIndexes(BatchUpdate batchUpdate) throws IOException {
|
||||
List<IndexSpecification> indexesToUpdate = new LinkedList<IndexSpecification>();
|
||||
|
||||
// Find the indexes we need to update
|
||||
for (IndexSpecification index : getIndexes()) {
|
||||
if (possiblyAppliesToIndex(index, batchUpdate)) {
|
||||
indexesToUpdate.add(index);
|
||||
}
|
||||
}
|
||||
|
||||
if (indexesToUpdate.size() == 0) {
|
||||
return;
|
||||
}
|
||||
|
||||
Set<byte[]> neededColumns = getColumnsForIndexes(indexesToUpdate);
|
||||
|
||||
SortedMap<byte[], byte[]> newColumnValues = getColumnsFromBatchUpdate(batchUpdate);
|
||||
Map<byte[], Cell> oldColumnCells = super.getFull(batchUpdate.getRow(),
|
||||
neededColumns, HConstants.LATEST_TIMESTAMP, null);
|
||||
|
||||
// Handle delete batch updates. Go back and get the next older values
|
||||
for (BatchOperation op : batchUpdate) {
|
||||
if (!op.isPut()) {
|
||||
Cell current = oldColumnCells.get(op.getColumn());
|
||||
if (current != null) {
|
||||
Cell [] older = super.get(batchUpdate.getRow(), op.getColumn(), current.getTimestamp(), 1);
|
||||
if (older != null && older.length > 0) {
|
||||
newColumnValues.put(op.getColumn(), older[0].getValue());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Add the old values to the new if they are not there
|
||||
for (Entry<byte[], Cell> oldEntry : oldColumnCells.entrySet()) {
|
||||
if (!newColumnValues.containsKey(oldEntry.getKey())) {
|
||||
newColumnValues.put(oldEntry.getKey(), oldEntry.getValue().getValue());
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
Iterator<IndexSpecification> indexIterator = indexesToUpdate.iterator();
|
||||
while (indexIterator.hasNext()) {
|
||||
IndexSpecification indexSpec = indexIterator.next();
|
||||
if (!doesApplyToIndex(indexSpec, newColumnValues)) {
|
||||
indexIterator.remove();
|
||||
}
|
||||
}
|
||||
|
||||
SortedMap<byte[], byte[]> oldColumnValues = convertToValueMap(oldColumnCells);
|
||||
|
||||
for (IndexSpecification indexSpec : indexesToUpdate) {
|
||||
removeOldIndexEntry(indexSpec, batchUpdate.getRow(), oldColumnValues);
|
||||
updateIndex(indexSpec, batchUpdate.getRow(), newColumnValues);
|
||||
}
|
||||
}
|
||||
|
||||
/** Return the columns needed for the update. */
|
||||
private Set<byte[]> getColumnsForIndexes(Collection<IndexSpecification> indexes) {
|
||||
Set<byte[]> neededColumns = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
||||
for (IndexSpecification indexSpec : indexes) {
|
||||
for (byte[] col : indexSpec.getAllColumns()) {
|
||||
neededColumns.add(col);
|
||||
}
|
||||
}
|
||||
return neededColumns;
|
||||
}
|
||||
|
||||
private void removeOldIndexEntry(IndexSpecification indexSpec, byte[] row,
|
||||
SortedMap<byte[], byte[]> oldColumnValues) throws IOException {
|
||||
for (byte[] indexedCol : indexSpec.getIndexedColumns()) {
|
||||
if (!oldColumnValues.containsKey(indexedCol)) {
|
||||
LOG.debug("Index [" + indexSpec.getIndexId()
|
||||
+ "] not trying to remove old entry for row ["
|
||||
+ Bytes.toString(row) + "] because col ["
|
||||
+ Bytes.toString(indexedCol) + "] is missing");
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
byte[] oldIndexRow = indexSpec.getKeyGenerator().createIndexKey(row,
|
||||
oldColumnValues);
|
||||
LOG.debug("Index [" + indexSpec.getIndexId() + "] removing old entry ["
|
||||
+ Bytes.toString(oldIndexRow) + "]");
|
||||
getIndexTable(indexSpec).deleteAll(oldIndexRow);
|
||||
}
|
||||
|
||||
private SortedMap<byte[], byte[]> getColumnsFromBatchUpdate(BatchUpdate b) {
|
||||
SortedMap<byte[], byte[]> columnValues = new TreeMap<byte[], byte[]>(
|
||||
Bytes.BYTES_COMPARATOR);
|
||||
for (BatchOperation op : b) {
|
||||
if (op.isPut()) {
|
||||
columnValues.put(op.getColumn(), op.getValue());
|
||||
}
|
||||
}
|
||||
return columnValues;
|
||||
}
|
||||
|
||||
/** Ask if this update *could* apply to the index. It may actually apply if some of the columns needed are missing.
|
||||
*
|
||||
* @param indexSpec
|
||||
* @param b
|
||||
* @return true if possibly apply.
|
||||
*/
|
||||
private boolean possiblyAppliesToIndex(IndexSpecification indexSpec, BatchUpdate b) {
|
||||
for (BatchOperation op : b) {
|
||||
if (indexSpec.containsColumn(op.getColumn())) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/** Ask if this update does apply to the index.
|
||||
*
|
||||
* @param indexSpec
|
||||
* @param b
|
||||
* @return true if possibly apply.
|
||||
*/
|
||||
private boolean doesApplyToIndex(IndexSpecification indexSpec, SortedMap<byte[], byte[]> columnValues) {
|
||||
|
||||
for (byte [] neededCol : indexSpec.getIndexedColumns()) {
|
||||
if (! columnValues.containsKey(neededCol)) {
|
||||
LOG.debug("Index [" + indexSpec.getIndexId() + "] can't be updated because ["
|
||||
+ Bytes.toString(neededCol) + "] is missing");
|
||||
return false;
|
||||
}
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
private void updateIndex(IndexSpecification indexSpec, byte[] row,
|
||||
SortedMap<byte[], byte[]> columnValues) throws IOException {
|
||||
BatchUpdate indexUpdate = createIndexUpdate(indexSpec, row, columnValues);
|
||||
getIndexTable(indexSpec).commit(indexUpdate);
|
||||
LOG.debug("Index [" + indexSpec.getIndexId() + "] adding new entry ["
|
||||
+ Bytes.toString(indexUpdate.getRow()) + "] for row ["
|
||||
+ Bytes.toString(row) + "]");
|
||||
|
||||
}
|
||||
|
||||
private BatchUpdate createIndexUpdate(IndexSpecification indexSpec,
|
||||
byte[] row, SortedMap<byte[], byte[]> columnValues) {
|
||||
byte[] indexRow = indexSpec.getKeyGenerator().createIndexKey(row,
|
||||
columnValues);
|
||||
BatchUpdate update = new BatchUpdate(indexRow);
|
||||
|
||||
update.put(IndexedTable.INDEX_BASE_ROW_COLUMN, row);
|
||||
|
||||
for (byte[] col : indexSpec.getIndexedColumns()) {
|
||||
byte[] val = columnValues.get(col);
|
||||
if (val == null) {
|
||||
throw new RuntimeException("Unexpected missing column value. ["+Bytes.toString(col)+"]");
|
||||
}
|
||||
update.put(col, val);
|
||||
}
|
||||
|
||||
for (byte [] col : indexSpec.getAdditionalColumns()) {
|
||||
byte[] val = columnValues.get(col);
|
||||
if (val != null) {
|
||||
update.put(col, val);
|
||||
}
|
||||
}
|
||||
|
||||
return update;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteAll(final byte[] row, final long ts, final Integer lockid)
|
||||
throws IOException {
|
||||
|
||||
if (getIndexes().size() != 0) {
|
||||
|
||||
// Need all columns
|
||||
Set<byte[]> neededColumns = getColumnsForIndexes(getIndexes());
|
||||
|
||||
Map<byte[], Cell> oldColumnCells = super.getFull(row,
|
||||
neededColumns, HConstants.LATEST_TIMESTAMP, null);
|
||||
SortedMap<byte[], byte[]> oldColumnValues = convertToValueMap(oldColumnCells);
|
||||
|
||||
|
||||
for (IndexSpecification indexSpec : getIndexes()) {
|
||||
removeOldIndexEntry(indexSpec, row, oldColumnValues);
|
||||
}
|
||||
|
||||
// Handle if there is still a version visible.
|
||||
if (ts != HConstants.LATEST_TIMESTAMP) {
|
||||
Map<byte[], Cell> currentColumnCells = super.getFull(row,
|
||||
neededColumns, ts, null);
|
||||
SortedMap<byte[], byte[]> currentColumnValues = convertToValueMap(currentColumnCells);
|
||||
|
||||
for (IndexSpecification indexSpec : getIndexes()) {
|
||||
if (doesApplyToIndex(indexSpec, currentColumnValues)) {
|
||||
updateIndex(indexSpec, row, currentColumnValues);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
super.deleteAll(row, ts, lockid);
|
||||
}
|
||||
|
||||
private SortedMap<byte[], byte[]> convertToValueMap(
|
||||
Map<byte[], Cell> cellMap) {
|
||||
SortedMap<byte[], byte[]> currentColumnValues = new TreeMap<byte[], byte[]>(Bytes.BYTES_COMPARATOR);
|
||||
for(Entry<byte[], Cell> entry : cellMap.entrySet()) {
|
||||
currentColumnValues.put(entry.getKey(), entry.getValue().getValue());
|
||||
}
|
||||
return currentColumnValues;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void deleteAll(final byte[] row, byte[] column, final long ts,
|
||||
final Integer lockid) throws IOException {
|
||||
List<IndexSpecification> indexesToUpdate = new LinkedList<IndexSpecification>();
|
||||
|
||||
for(IndexSpecification indexSpec : getIndexes()) {
|
||||
if (indexSpec.containsColumn(column)) {
|
||||
indexesToUpdate.add(indexSpec);
|
||||
}
|
||||
}
|
||||
|
||||
Set<byte[]> neededColumns = getColumnsForIndexes(indexesToUpdate);
|
||||
Map<byte[], Cell> oldColumnCells = super.getFull(row,
|
||||
neededColumns, HConstants.LATEST_TIMESTAMP, null);
|
||||
SortedMap<byte [], byte[]> oldColumnValues = convertToValueMap(oldColumnCells);
|
||||
|
||||
for (IndexSpecification indexSpec : indexesToUpdate) {
|
||||
removeOldIndexEntry(indexSpec, row, oldColumnValues);
|
||||
}
|
||||
|
||||
// Handle if there is still a version visible.
|
||||
if (ts != HConstants.LATEST_TIMESTAMP) {
|
||||
Map<byte[], Cell> currentColumnCells = super.getFull(row,
|
||||
neededColumns, ts, null);
|
||||
SortedMap<byte[], byte[]> currentColumnValues = convertToValueMap(currentColumnCells);
|
||||
|
||||
for (IndexSpecification indexSpec : getIndexes()) {
|
||||
if (doesApplyToIndex(indexSpec, currentColumnValues)) {
|
||||
updateIndex(indexSpec, row, currentColumnValues);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
super.deleteAll(row, column, ts, lockid);
|
||||
}
|
||||
|
||||
}
|
|
@ -0,0 +1,66 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.regionserver.tableindexed;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HRegionInfo;
|
||||
import org.apache.hadoop.hbase.HServerAddress;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.ipc.IndexedRegionInterface;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.HRegionServer;
|
||||
import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion;
|
||||
import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer;
|
||||
import org.apache.hadoop.util.Progressable;
|
||||
|
||||
/**
|
||||
* RegionServer which maintains secondary indexes.
|
||||
*
|
||||
**/
|
||||
public class IndexedRegionServer extends TransactionalRegionServer implements
|
||||
IndexedRegionInterface {
|
||||
|
||||
public IndexedRegionServer(HBaseConfiguration conf) throws IOException {
|
||||
this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
|
||||
DEFAULT_REGIONSERVER_ADDRESS)), conf);
|
||||
}
|
||||
|
||||
public IndexedRegionServer(HServerAddress serverAddress,
|
||||
HBaseConfiguration conf) throws IOException {
|
||||
super(serverAddress, conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
|
||||
throws IOException {
|
||||
HRegion r = new IndexedRegion(HTableDescriptor.getTableDir(super
|
||||
.getRootDir(), regionInfo.getTableDesc().getName()), super.log, super
|
||||
.getFileSystem(), super.conf, regionInfo, super.getFlushRequester());
|
||||
r.initialize(null, new Progressable() {
|
||||
public void progress() {
|
||||
addProcessingMessage(regionInfo);
|
||||
}
|
||||
});
|
||||
return r;
|
||||
}
|
||||
|
||||
}
|
|
@ -73,7 +73,7 @@ import org.apache.hadoop.util.Progressable;
|
|||
* will have to consult the transaction log to determine the final decision of
|
||||
* the transaction. This is not yet implemented.
|
||||
*/
|
||||
class TransactionalRegion extends HRegion {
|
||||
public class TransactionalRegion extends HRegion {
|
||||
|
||||
private static final String LEASE_TIME = "hbase.transaction.leaseTime";
|
||||
private static final int DEFAULT_LEASE_TIME = 60 * 1000;
|
||||
|
@ -501,7 +501,7 @@ class TransactionalRegion extends HRegion {
|
|||
}
|
||||
|
||||
for (BatchUpdate update : state.getWriteSet()) {
|
||||
super.batchUpdate(update, false); // Don't need to WAL these
|
||||
this.batchUpdate(update, false); // Don't need to WAL these
|
||||
// FIME, maybe should be walled so we don't need to look so far back.
|
||||
}
|
||||
|
||||
|
|
|
@ -527,7 +527,7 @@ public class PerformanceEvaluation implements HConstants {
|
|||
* @return Returns zero-prefixed 10-byte wide decimal version of passed
|
||||
* number (Does absolute in case number is negative).
|
||||
*/
|
||||
static byte [] format(final int number) {
|
||||
public static byte [] format(final int number) {
|
||||
byte [] b = new byte[10];
|
||||
int d = Math.abs(number);
|
||||
for (int i = b.length - 1; i >= 0; i--) {
|
||||
|
|
|
@ -0,0 +1,131 @@
|
|||
/**
|
||||
* Copyright 2008 The Apache Software Foundation
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.client.tableindexed;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Random;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.hbase.HBaseClusterTestCase;
|
||||
import org.apache.hadoop.hbase.HColumnDescriptor;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.HTableDescriptor;
|
||||
import org.apache.hadoop.hbase.PerformanceEvaluation;
|
||||
import org.apache.hadoop.hbase.client.Scanner;
|
||||
import org.apache.hadoop.hbase.io.BatchUpdate;
|
||||
import org.apache.hadoop.hbase.io.RowResult;
|
||||
import org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
||||
public class TestIndexedTable extends HBaseClusterTestCase {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(TestIndexedTable.class);
|
||||
|
||||
private static final String TABLE_NAME = "table1";
|
||||
|
||||
private static final byte[] FAMILY = Bytes.toBytes("family:");
|
||||
private static final byte[] COL_A = Bytes.toBytes("family:a");
|
||||
private static final String INDEX_COL_A_ASC = "A-Acending";
|
||||
|
||||
private static final int NUM_ROWS = 10;
|
||||
private static final int MAX_VAL = 10000;
|
||||
|
||||
private IndexedTableAdmin admin;
|
||||
private IndexedTable table;
|
||||
private Random random = new Random();
|
||||
|
||||
/** constructor */
|
||||
public TestIndexedTable() {
|
||||
conf
|
||||
.set(HConstants.REGION_SERVER_IMPL, IndexedRegionServer.class.getName());
|
||||
conf.setInt("hbase.master.info.port", -1);
|
||||
conf.setInt("hbase.regionserver.info.port", -1);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setUp() throws Exception {
|
||||
super.setUp();
|
||||
|
||||
HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
|
||||
desc.addFamily(new HColumnDescriptor(FAMILY));
|
||||
|
||||
// Create a new index that does lexicographic ordering on COL_A
|
||||
IndexSpecification colAIndex = new IndexSpecification(INDEX_COL_A_ASC,
|
||||
COL_A, true);
|
||||
desc.addIndex(colAIndex);
|
||||
|
||||
admin = new IndexedTableAdmin(conf);
|
||||
admin.createTable(desc);
|
||||
table = new IndexedTable(conf, desc.getName());
|
||||
}
|
||||
|
||||
private void writeInitalRows() throws IOException {
|
||||
for (int i = 0; i < NUM_ROWS; i++) {
|
||||
BatchUpdate update = new BatchUpdate(PerformanceEvaluation.format(i));
|
||||
byte[] colA = PerformanceEvaluation.format(random.nextInt(MAX_VAL));
|
||||
update.put(COL_A, colA);
|
||||
table.commit(update);
|
||||
LOG.info("Inserted row [" + Bytes.toString(update.getRow()) + "] val: ["
|
||||
+ Bytes.toString(colA) + "]");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public void testInitialWrites() throws IOException {
|
||||
writeInitalRows();
|
||||
assertRowsInOrder(NUM_ROWS);
|
||||
}
|
||||
|
||||
private void assertRowsInOrder(int numRowsExpected) throws IndexNotFoundException, IOException {
|
||||
Scanner scanner = table.getIndexedScanner(INDEX_COL_A_ASC,
|
||||
HConstants.EMPTY_START_ROW, null, null, null);
|
||||
int numRows = 0;
|
||||
byte[] lastColA = null;
|
||||
for (RowResult rowResult : scanner) {
|
||||
byte[] colA = rowResult.get(COL_A).getValue();
|
||||
LOG.info("index scan : row [" + Bytes.toString(rowResult.getRow())
|
||||
+ "] value [" + Bytes.toString(colA) + "]");
|
||||
if (lastColA != null) {
|
||||
Assert.assertTrue(Bytes.compareTo(lastColA, colA) <= 0);
|
||||
}
|
||||
lastColA = colA;
|
||||
numRows++;
|
||||
}
|
||||
Assert.assertEquals(numRowsExpected, numRows);
|
||||
}
|
||||
|
||||
public void testMultipleWrites() throws IOException {
|
||||
writeInitalRows();
|
||||
writeInitalRows(); // Update the rows.
|
||||
assertRowsInOrder(NUM_ROWS);
|
||||
}
|
||||
|
||||
public void testDelete() throws IOException {
|
||||
writeInitalRows();
|
||||
// Delete the first row;
|
||||
table.deleteAll(PerformanceEvaluation.format(0));
|
||||
|
||||
assertRowsInOrder(NUM_ROWS - 1);
|
||||
}
|
||||
|
||||
}
|
Loading…
Reference in New Issue