From fc56f9ca0df6cadee38a03ce3ab5ab0d86a7261b Mon Sep 17 00:00:00 2001 From: Andrew Kyle Purtell Date: Mon, 17 Nov 2008 18:34:07 +0000 Subject: [PATCH] HBASE-883 Secondary indexes git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@718317 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 1 + .../org/apache/hadoop/hbase/HStoreKey.java | 3 + .../apache/hadoop/hbase/HTableDescriptor.java | 89 ++++- .../hadoop/hbase/WritableComparator.java | 28 ++ .../client/UnmodifyableHTableDescriptor.java | 8 +- .../tableindexed/IndexKeyGenerator.java | 29 ++ .../tableindexed/IndexNotFoundException.java | 45 +++ .../tableindexed/IndexSpecification.java | 200 ++++++++++ .../client/tableindexed/IndexedTable.java | 222 ++++++++++++ .../tableindexed/IndexedTableAdmin.java | 99 +++++ .../ReverseByteArrayComparator.java | 46 +++ .../tableindexed/SimpleIndexKeyGenerator.java | 59 +++ .../hbase/client/tableindexed/package.html | 47 +++ .../hbase/ipc/IndexedRegionInterface.java | 11 + .../tableindexed/IndexedRegion.java | 342 ++++++++++++++++++ .../tableindexed/IndexedRegionServer.java | 66 ++++ .../transactional/TransactionalRegion.java | 4 +- .../hadoop/hbase/PerformanceEvaluation.java | 2 +- .../client/tableindexed/TestIndexedTable.java | 131 +++++++ 19 files changed, 1426 insertions(+), 6 deletions(-) create mode 100644 src/java/org/apache/hadoop/hbase/WritableComparator.java create mode 100644 src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java create mode 100644 src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java create mode 100644 src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java create mode 100644 src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java create mode 100644 src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java create mode 100644 src/java/org/apache/hadoop/hbase/client/tableindexed/ReverseByteArrayComparator.java create mode 100644 src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java create mode 100644 src/java/org/apache/hadoop/hbase/client/tableindexed/package.html create mode 100644 src/java/org/apache/hadoop/hbase/ipc/IndexedRegionInterface.java create mode 100644 src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java create mode 100644 src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java create mode 100644 src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java diff --git a/CHANGES.txt b/CHANGES.txt index 7092bfe8365..7b69ace271b 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -127,6 +127,7 @@ Release 0.19.0 - Unreleased HBASE-875 Use MurmurHash instead of JenkinsHash [in bloomfilters] (Andrzej Bialecki via Stack) HBASE-625 Metrics support for cluster load history: emissions and graphs + HBASE-883 Secondary indexes (Clint Morgan via Andrew Purtell) OPTIMIZATIONS HBASE-748 Add an efficient way to batch update many rows diff --git a/src/java/org/apache/hadoop/hbase/HStoreKey.java b/src/java/org/apache/hadoop/hbase/HStoreKey.java index 8c8b0a78881..5eac2336734 100644 --- a/src/java/org/apache/hadoop/hbase/HStoreKey.java +++ b/src/java/org/apache/hadoop/hbase/HStoreKey.java @@ -529,6 +529,9 @@ public class HStoreKey implements WritableComparable { if(rowCompare == 0) rowCompare = Bytes.compareTo(keysA[1], KeysB[1]); return rowCompare; + } + if (regionInfo != null && regionInfo.getTableDesc().getRowKeyComparator() != null) { + return regionInfo.getTableDesc().getRowKeyComparator().compare(rowA, rowB); } return Bytes.compareTo(rowA, rowB); } diff --git a/src/java/org/apache/hadoop/hbase/HTableDescriptor.java b/src/java/org/apache/hadoop/hbase/HTableDescriptor.java index 66c7aff3d9c..ab55de77346 100644 --- a/src/java/org/apache/hadoop/hbase/HTableDescriptor.java +++ b/src/java/org/apache/hadoop/hbase/HTableDescriptor.java @@ -19,8 +19,12 @@ */ package org.apache.hadoop.hbase; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; import java.io.DataInput; +import java.io.DataInputStream; import java.io.DataOutput; +import java.io.DataOutputStream; import java.io.IOException; import java.util.Collection; import java.util.Collections; @@ -29,8 +33,10 @@ import java.util.Iterator; import java.util.Map; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.ObjectWritable; import org.apache.hadoop.io.WritableComparable; /** @@ -41,7 +47,8 @@ public class HTableDescriptor implements WritableComparable { // Changes prior to version 3 were not recorded here. // Version 3 adds metadata as a map where keys and values are byte[]. - public static final byte TABLE_DESCRIPTOR_VERSION = 3; + // Version 4 adds indexes + public static final byte TABLE_DESCRIPTOR_VERSION = 4; private byte [] name = HConstants.EMPTY_BYTE_ARRAY; private String nameAsString = ""; @@ -66,9 +73,13 @@ public class HTableDescriptor implements WritableComparable { public static final ImmutableBytesWritable IS_ROOT_KEY = new ImmutableBytesWritable(Bytes.toBytes(IS_ROOT)); public static final String IS_META = "IS_META"; + + public static final String ROW_KEY_COMPARATOR = "ROW_KEY_COMPARATOR"; + public static final ImmutableBytesWritable IS_META_KEY = new ImmutableBytesWritable(Bytes.toBytes(IS_META)); + // The below are ugly but better than creating them each time till we // replace booleans being saved as Strings with plain booleans. Need a // migration script to do this. TODO. @@ -90,6 +101,10 @@ public class HTableDescriptor implements WritableComparable { private final Map families = new HashMap(); + // Key is indexId + private final Map indexes = + new HashMap(); + /** * Private constructor used internally creating table descriptors for * catalog tables: e.g. .META. and -ROOT-. @@ -108,6 +123,7 @@ public class HTableDescriptor implements WritableComparable { * catalog tables: e.g. .META. and -ROOT-. */ protected HTableDescriptor(final byte [] name, HColumnDescriptor[] families, + Collection indexes, Map values) { this.name = name.clone(); this.nameAsString = Bytes.toString(this.name); @@ -115,6 +131,9 @@ public class HTableDescriptor implements WritableComparable { for(HColumnDescriptor descriptor : families) { this.families.put(Bytes.mapKey(descriptor.getName()), descriptor); } + for(IndexSpecification index : indexes) { + this.indexes.put(index.getIndexId(), index); + } for (Map.Entry entry: values.entrySet()) { this.values.put(entry.getKey(), entry.getValue()); @@ -404,7 +423,7 @@ public class HTableDescriptor implements WritableComparable { return Integer.valueOf(Bytes.toString(value)).intValue(); return DEFAULT_MEMCACHE_FLUSH_SIZE; } - + /** * @param memcacheFlushSize memory cache flush size for each hregion */ @@ -412,6 +431,58 @@ public class HTableDescriptor implements WritableComparable { setValue(MEMCACHE_FLUSHSIZE_KEY, Bytes.toBytes(Integer.toString(memcacheFlushSize))); } + + + public void setRowKeyComparator(WritableComparator newComparator) { + if (newComparator == null) { + return; + } + + ByteArrayOutputStream bos = new ByteArrayOutputStream(); + DataOutputStream dos = new DataOutputStream(bos); + HBaseConfiguration conf = new HBaseConfiguration(); + try { + ObjectWritable.writeObject(dos, newComparator, WritableComparator.class, conf); + dos.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + setValue(ROW_KEY_COMPARATOR.getBytes(), bos.toByteArray()); + this.comparator = newComparator; + } + + private WritableComparator comparator = null; + public WritableComparator getRowKeyComparator() { + if (comparator != null) { + return comparator; + } + byte[] bytes = getValue(ROW_KEY_COMPARATOR.getBytes()); + if (bytes == null) { + return null; + } + ByteArrayInputStream bis = new ByteArrayInputStream(bytes); + DataInputStream in = new DataInputStream(bis); + HBaseConfiguration conf = new HBaseConfiguration(); + try { + comparator = (WritableComparator) ObjectWritable.readObject(in, conf); + } catch (IOException e) { + throw new RuntimeException(e); + } + return comparator; + } + + + public Collection getIndexes() { + return indexes.values(); + } + + public IndexSpecification getIndex(String indexId) { + return indexes.get(indexId); + } + + public void addIndex(IndexSpecification index) { + indexes.put(index.getIndexId(), index); + } /** * Adds a column family. @@ -519,6 +590,16 @@ public class HTableDescriptor implements WritableComparable { c.readFields(in); families.put(Bytes.mapKey(c.getName()), c); } + indexes.clear(); + if (version < 4) { + return; + } + int numIndexes = in.readInt(); + for (int i = 0; i < numIndexes; i++) { + IndexSpecification index = new IndexSpecification(); + index.readFields(in); + addIndex(index); + } } public void write(DataOutput out) throws IOException { @@ -538,6 +619,10 @@ public class HTableDescriptor implements WritableComparable { HColumnDescriptor family = it.next(); family.write(out); } + out.writeInt(indexes.size()); + for(IndexSpecification index : indexes.values()) { + index.write(out); + } } // Comparable diff --git a/src/java/org/apache/hadoop/hbase/WritableComparator.java b/src/java/org/apache/hadoop/hbase/WritableComparator.java new file mode 100644 index 00000000000..b765d681e8a --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/WritableComparator.java @@ -0,0 +1,28 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase; + +import java.util.Comparator; + +import org.apache.hadoop.io.Writable; + +public interface WritableComparator extends Writable, Comparator { +// No methods, just bring the two interfaces together +} diff --git a/src/java/org/apache/hadoop/hbase/client/UnmodifyableHTableDescriptor.java b/src/java/org/apache/hadoop/hbase/client/UnmodifyableHTableDescriptor.java index 4a6b714ab49..bad53242a67 100644 --- a/src/java/org/apache/hadoop/hbase/client/UnmodifyableHTableDescriptor.java +++ b/src/java/org/apache/hadoop/hbase/client/UnmodifyableHTableDescriptor.java @@ -22,6 +22,7 @@ package org.apache.hadoop.hbase.client; import org.apache.hadoop.hbase.HColumnDescriptor; import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification; /** * Read-only table descriptor. @@ -37,7 +38,7 @@ public class UnmodifyableHTableDescriptor extends HTableDescriptor { * @param desc */ UnmodifyableHTableDescriptor(final HTableDescriptor desc) { - super(desc.getName(), getUnmodifyableFamilies(desc), desc.getValues()); + super(desc.getName(), getUnmodifyableFamilies(desc), desc.getIndexes(), desc.getValues()); } /* @@ -108,4 +109,9 @@ public class UnmodifyableHTableDescriptor extends HTableDescriptor { public void setMemcacheFlushSize(int memcacheFlushSize) { throw new UnsupportedOperationException("HTableDescriptor is read-only"); } + + @Override + public void addIndex(IndexSpecification index) { + throw new UnsupportedOperationException("HTableDescriptor is read-only"); + } } diff --git a/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java b/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java new file mode 100644 index 00000000000..dae811ee3b0 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java @@ -0,0 +1,29 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.util.Map; + +import org.apache.hadoop.io.Writable; + +public interface IndexKeyGenerator extends Writable { + + byte [] createIndexKey(byte [] rowKey, Map columns); +} diff --git a/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java b/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java new file mode 100644 index 00000000000..6e6d865bdd3 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java @@ -0,0 +1,45 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.IOException; + +/** + * Thrown when asking for an index that does not exist. + */ +public class IndexNotFoundException extends IOException { + + public IndexNotFoundException() { + super(); + } + + public IndexNotFoundException(String arg0) { + super(arg0); + } + + public IndexNotFoundException(Throwable arg0) { + super(arg0.getMessage()); + } + + public IndexNotFoundException(String arg0, Throwable arg1) { + super(arg0+arg1.getMessage()); + } + +} diff --git a/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java b/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java new file mode 100644 index 00000000000..a32309287e5 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java @@ -0,0 +1,200 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.WritableComparator; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.ObjectWritable; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableComparable; + +/** Holds the specification for a single secondary index. */ +public class IndexSpecification implements Writable { + + // Columns that are indexed (part of the indexRowKey) + private byte[][] indexedColumns; + + // Constructs the + private IndexKeyGenerator keyGenerator; + + private WritableComparator keyComparator; + + // Additional columns mapped into the indexed row. These will be available for + // filters when scanning the index. + private byte[][] additionalColumns; + + private byte[][] allColumns; + + // Id of this index, unique within a table. + private String indexId; + + /** Construct an "simple" index spec for a single column. */ + public IndexSpecification(String indexId, byte[] indexedColumn, + boolean acending) { + this(indexId, new byte[][] { indexedColumn }, null, + new SimpleIndexKeyGenerator(indexedColumn), acending == true ? null + : new ReverseByteArrayComparator()); + } + + /** + * Construct an index spec by specifying everything. + * + * @param indexId + * @param indexedColumns + * @param additionalColumns + * @param keyGenerator + * @param keyComparator + */ + public IndexSpecification(String indexId, byte[][] indexedColumns, + byte[][] additionalColumns, IndexKeyGenerator keyGenerator, + WritableComparator keyComparator) { + this.indexId = indexId; + this.indexedColumns = indexedColumns; + this.additionalColumns = additionalColumns; + this.keyGenerator = keyGenerator; + this.keyComparator = keyComparator; + this.makeAllColumns(); + } + + public IndexSpecification() { + // For writable + } + + private void makeAllColumns() { + this.allColumns = new byte[indexedColumns.length + + (additionalColumns == null ? 0 : additionalColumns.length)][]; + System.arraycopy(indexedColumns, 0, allColumns, 0, indexedColumns.length); + if (additionalColumns != null) { + System.arraycopy(additionalColumns, 0, allColumns, indexedColumns.length, + additionalColumns.length); + } + } + + /** + * Get the indexedColumns. + * + * @return Return the indexedColumns. + */ + public byte[][] getIndexedColumns() { + return indexedColumns; + } + + /** + * Get the keyGenerator. + * + * @return Return the keyGenerator. + */ + public IndexKeyGenerator getKeyGenerator() { + return keyGenerator; + } + + /** + * Get the keyComparator. + * + * @return Return the keyComparator. + */ + public WritableComparator getKeyComparator() { + return keyComparator; + } + + /** + * Get the additionalColumns. + * + * @return Return the additionalColumns. + */ + public byte[][] getAdditionalColumns() { + return additionalColumns; + } + + /** + * Get the indexId. + * + * @return Return the indexId. + */ + public String getIndexId() { + return indexId; + } + + public byte[][] getAllColumns() { + return allColumns; + } + + public boolean containsColumn(byte[] column) { + for (byte[] col : allColumns) { + if (Bytes.equals(column, col)) { + return true; + } + } + return false; + } + + public byte[] getIndexedTableName(byte[] baseTableName) { + return Bytes.add(baseTableName, Bytes.toBytes("-" + indexId)); + } + + /** {@inheritDoc} */ + @SuppressWarnings("unchecked") + public void readFields(DataInput in) throws IOException { + indexId = in.readUTF(); + int numIndexedCols = in.readInt(); + indexedColumns = new byte[numIndexedCols][]; + for (int i = 0; i < numIndexedCols; i++) { + indexedColumns[i] = Bytes.readByteArray(in); + } + int numAdditionalCols = in.readInt(); + additionalColumns = new byte[numAdditionalCols][]; + for (int i = 0; i < numAdditionalCols; i++) { + additionalColumns[i] = Bytes.readByteArray(in); + } + makeAllColumns(); + HBaseConfiguration conf = new HBaseConfiguration(); + keyGenerator = (IndexKeyGenerator) ObjectWritable.readObject(in, conf); + keyComparator = (WritableComparator) ObjectWritable.readObject(in, + conf); + } + + /** {@inheritDoc} */ + public void write(DataOutput out) throws IOException { + out.writeUTF(indexId); + out.writeInt(indexedColumns.length); + for (byte[] col : indexedColumns) { + Bytes.writeByteArray(out, col); + } + if (additionalColumns != null) { + out.writeInt(additionalColumns.length); + for (byte[] col : additionalColumns) { + Bytes.writeByteArray(out, col); + } + } else { + out.writeInt(0); + } + HBaseConfiguration conf = new HBaseConfiguration(); + ObjectWritable + .writeObject(out, keyGenerator, IndexKeyGenerator.class, conf); + ObjectWritable.writeObject(out, keyComparator, WritableComparable.class, + conf); + } + +} diff --git a/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java b/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java new file mode 100644 index 00000000000..d3cfaf6ef35 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTable.java @@ -0,0 +1,222 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scanner; +import org.apache.hadoop.hbase.client.transactional.TransactionalTable; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.HbaseMapWritable; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.util.Bytes; + +/** HTable extended with indexed support. */ +public class IndexedTable extends TransactionalTable { + + // FIXME, these belong elsewhere + public static final byte[] INDEX_COL_FAMILY_NAME = Bytes.toBytes("__INDEX__"); + public static final byte[] INDEX_COL_FAMILY = Bytes.add( + INDEX_COL_FAMILY_NAME, new byte[] { HStoreKey.COLUMN_FAMILY_DELIMITER }); + public static final byte[] INDEX_BASE_ROW_COLUMN = Bytes.add( + INDEX_COL_FAMILY, Bytes.toBytes("ROW")); + + private static final Log LOG = LogFactory.getLog(IndexedTable.class); + + private Map indexIdToTable = new HashMap(); + + /** {@inheritDoc} */ + public IndexedTable(final HBaseConfiguration conf, final byte[] tableName) + throws IOException { + super(conf, tableName); + + for (IndexSpecification spec : super.getTableDescriptor().getIndexes()) { + indexIdToTable.put(spec.getIndexId(), new HTable(conf, spec + .getIndexedTableName(tableName))); + } + } + + /** + * Open up an indexed scanner. Results will come back in the indexed order, + * but will contain RowResults from the original table. + * + * @param indexId the id of the index to use + * @param indexStartRow (created from the IndexKeyGenerator) + * @param indexColumns in the index table + * @param indexFilter filter to run on the index'ed table. This can only use + * columns that have been added to the index. + * @param baseColumns from the original table + * @return scanner + * @throws IOException + * @throws IndexNotFoundException + */ + public Scanner getIndexedScanner(String indexId, final byte[] indexStartRow, + byte[][] indexColumns, final RowFilterInterface indexFilter, + final byte[][] baseColumns) throws IOException, IndexNotFoundException { + IndexSpecification indexSpec = super.getTableDescriptor().getIndex(indexId); + if (indexSpec == null) { + throw new IndexNotFoundException("Index " + indexId + + " not defined in table " + + super.getTableDescriptor().getNameAsString()); + } + verifyIndexColumns(indexColumns, indexSpec); + // TODO, verify/remove index columns from baseColumns + + HTable indexTable = indexIdToTable.get(indexId); + + byte[][] allIndexColumns; + if (indexColumns != null) { + allIndexColumns = new byte[indexColumns.length + 1][]; + System + .arraycopy(indexColumns, 0, allIndexColumns, 0, indexColumns.length); + allIndexColumns[indexColumns.length] = INDEX_BASE_ROW_COLUMN; + } else { + byte[][] allColumns = indexSpec.getAllColumns(); + allIndexColumns = new byte[allColumns.length + 1][]; + System.arraycopy(allColumns, 0, allIndexColumns, 0, allColumns.length); + allIndexColumns[allColumns.length] = INDEX_BASE_ROW_COLUMN; + } + + Scanner indexScanner = indexTable.getScanner(allIndexColumns, + indexStartRow, indexFilter); + + return new ScannerWrapper(indexScanner, baseColumns); + } + + private void verifyIndexColumns(byte[][] requestedColumns, + IndexSpecification indexSpec) { + if (requestedColumns == null) { + return; + } + for (byte[] requestedColumn : requestedColumns) { + boolean found = false; + for (byte[] indexColumn : indexSpec.getAllColumns()) { + if (Bytes.equals(requestedColumn, indexColumn)) { + found = true; + break; + } + } + if (!found) { + throw new RuntimeException("Column [" + Bytes.toString(requestedColumn) + + "] not in index " + indexSpec.getIndexId()); + } + } + } + + private class ScannerWrapper implements Scanner { + + private Scanner indexScanner; + private byte[][] columns; + + public ScannerWrapper(Scanner indexScanner, byte[][] columns) { + this.indexScanner = indexScanner; + this.columns = columns; + } + + /** {@inheritDoc} */ + public RowResult next() throws IOException { + RowResult[] result = next(1); + if (result == null || result.length < 1) + return null; + return result[0]; + } + + /** {@inheritDoc} */ + public RowResult[] next(int nbRows) throws IOException { + RowResult[] indexResult = indexScanner.next(nbRows); + if (indexResult == null) { + return null; + } + RowResult[] result = new RowResult[indexResult.length]; + for (int i = 0; i < indexResult.length; i++) { + RowResult row = indexResult[i]; + byte[] baseRow = row.get(INDEX_BASE_ROW_COLUMN).getValue(); + LOG.debug("next index row [" + Bytes.toString(row.getRow()) + + "] -> base row [" + Bytes.toString(baseRow) + "]"); + HbaseMapWritable colValues = + new HbaseMapWritable(); + if (columns != null && columns.length > 0) { + LOG.debug("Going to base table for remaining columns"); + RowResult baseResult = IndexedTable.this.getRow(baseRow, columns); + colValues.putAll(baseResult); + } + for (Entry entry : row.entrySet()) { + byte[] col = entry.getKey(); + if (HStoreKey.matchingFamily(INDEX_COL_FAMILY_NAME, col)) { + continue; + } + colValues.put(col, entry.getValue()); + } + result[i] = new RowResult(baseRow, colValues); + } + return result; + } + + /** {@inheritDoc} */ + public void close() { + indexScanner.close(); + } + + /** {@inheritDoc} */ + public Iterator iterator() { + // FIXME, copied from HTable.ClientScanner. Extract this to common base + // class? + return new Iterator() { + RowResult next = null; + + public boolean hasNext() { + if (next == null) { + try { + next = ScannerWrapper.this.next(); + return next != null; + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return true; + } + + public RowResult next() { + if (!hasNext()) { + return null; + } + RowResult temp = next; + next = null; + return temp; + } + + public void remove() { + throw new UnsupportedOperationException(); + } + }; + } + + } +} diff --git a/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java b/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java new file mode 100644 index 00000000000..4e26f89e2d7 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java @@ -0,0 +1,99 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.IOException; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.ColumnNameParseException; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.MasterNotRunningException; +import org.apache.hadoop.hbase.TableExistsException; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Extension of HBaseAdmin that creates indexed tables. + * + */ +public class IndexedTableAdmin extends HBaseAdmin { + + /** + * Constructor + * + * @param conf Configuration object + * @throws MasterNotRunningException + */ + public IndexedTableAdmin(HBaseConfiguration conf) + throws MasterNotRunningException { + super(conf); + } + + /** + * Creates a new table + * + * @param desc table descriptor for table + * + * @throws IllegalArgumentException if the table name is reserved + * @throws MasterNotRunningException if master is not running + * @throws TableExistsException if table already exists (If concurrent + * threads, the table may have been created between test-for-existence and + * attempt-at-creation). + * @throws IOException + */ + @Override + public void createTable(HTableDescriptor desc) throws IOException { + super.createTable(desc); + this.createIndexTables(desc); + } + + private void createIndexTables(HTableDescriptor tableDesc) throws IOException { + byte[] baseTableName = tableDesc.getName(); + for (IndexSpecification indexSpec : tableDesc.getIndexes()) { + HTableDescriptor indexTableDesc = createIndexTableDesc(baseTableName, + indexSpec); + super.createTable(indexTableDesc); + } + } + + private HTableDescriptor createIndexTableDesc(byte[] baseTableName, + IndexSpecification indexSpec) throws ColumnNameParseException { + HTableDescriptor indexTableDesc = new HTableDescriptor(indexSpec + .getIndexedTableName(baseTableName)); + Set families = new TreeSet(Bytes.BYTES_COMPARATOR); + families.add(IndexedTable.INDEX_COL_FAMILY); + for (byte[] column : indexSpec.getAllColumns()) { + families.add(Bytes.add(HStoreKey.getFamily(column), + new byte[] { HStoreKey.COLUMN_FAMILY_DELIMITER })); + } + + for (byte[] colFamily : families) { + indexTableDesc.addFamily(new HColumnDescriptor(colFamily)); + } + + indexTableDesc.setRowKeyComparator(indexSpec.getKeyComparator()); + + return indexTableDesc; + } +} diff --git a/src/java/org/apache/hadoop/hbase/client/tableindexed/ReverseByteArrayComparator.java b/src/java/org/apache/hadoop/hbase/client/tableindexed/ReverseByteArrayComparator.java new file mode 100644 index 00000000000..8af7b81d552 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/tableindexed/ReverseByteArrayComparator.java @@ -0,0 +1,46 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.hbase.WritableComparator; +import org.apache.hadoop.hbase.util.Bytes; + +public class ReverseByteArrayComparator implements WritableComparator { + + /** {@inheritDoc} */ + public int compare(byte[] o1, byte[] o2) { + return Bytes.compareTo(o2, o1); + } + + + /** {@inheritDoc} */ + public void readFields(DataInput arg0) throws IOException { + // Nothing + } + + /** {@inheritDoc} */ + public void write(DataOutput arg0) throws IOException { + // Nothing + } +} diff --git a/src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java b/src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java new file mode 100644 index 00000000000..49694177a70 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java @@ -0,0 +1,59 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.Map; + +import org.apache.hadoop.hbase.util.Bytes; + +/** Creates indexed keys for a single column.... + * + */ +public class SimpleIndexKeyGenerator implements IndexKeyGenerator { + + private byte [] column; + + public SimpleIndexKeyGenerator(byte [] column) { + this.column = column; + } + + public SimpleIndexKeyGenerator() { + // For Writable + } + + /** {@inheritDoc} */ + public byte[] createIndexKey(byte[] rowKey, Map columns) { + return Bytes.add(columns.get(column), rowKey); + } + + /** {@inheritDoc} */ + public void readFields(DataInput in) throws IOException { + column = Bytes.readByteArray(in); + } + + /** {@inheritDoc} */ + public void write(DataOutput out) throws IOException { + Bytes.writeByteArray(out, column); + } + +} diff --git a/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html b/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html new file mode 100644 index 00000000000..b4de0989628 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html @@ -0,0 +1,47 @@ + + + + + + + + +This package provides support for secondary indexing by maintaining a separate, "index", table for each index. + +The IndexSpecification class provides the metadata for the index. This includes: +
  • the columns that contribute to the index key, +
  • additional columns to put in the index table (and are thus made available to filters on the index table), +
  • an IndexKeyGenerator which constructs the index-row-key from the indexed column(s) and the original row, +
    and +
  • (optionally) a custom key comparator for the indexed table. This can allow an index on a deserialized column value. + +IndexesSpecifications can be added to a table's metadata (HTableDescriptor) before the table is constructed. +Afterwards, updates and deletes to the original table will trigger the updates in the index, and +the indexes can be scanned using the API on IndexedTable. + +For a simple example, look at the unit test in org.apache.hadoop.hbase.client.tableIndexed. + +

    To enable the indexing, modify hbase-site.xml to turn on the +IndexedRegionServer. This is done by setting +hbase.regionserver.class to +org.apache.hadoop.hbase.ipc.IndexedRegionInterface and +hbase.regionserver.impl to +org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer + + + diff --git a/src/java/org/apache/hadoop/hbase/ipc/IndexedRegionInterface.java b/src/java/org/apache/hadoop/hbase/ipc/IndexedRegionInterface.java new file mode 100644 index 00000000000..6f5bb0ba916 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/ipc/IndexedRegionInterface.java @@ -0,0 +1,11 @@ +/* + * $Id$ + * Created on Sep 10, 2008 + * + */ +package org.apache.hadoop.hbase.ipc; + +/** Interface for the indexed region server. */ +public interface IndexedRegionInterface extends TransactionalRegionInterface { + // No methods for now... +} diff --git a/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java new file mode 100644 index 00000000000..cac9266a625 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegion.java @@ -0,0 +1,342 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.tableindexed; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.Map.Entry; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.tableindexed.IndexSpecification; +import org.apache.hadoop.hbase.client.tableindexed.IndexedTable; +import org.apache.hadoop.hbase.io.BatchOperation; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.regionserver.FlushRequester; +import org.apache.hadoop.hbase.regionserver.HLog; +import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion; +import org.apache.hadoop.hbase.util.Bytes; + +class IndexedRegion extends TransactionalRegion { + + private static final Log LOG = LogFactory.getLog(IndexedRegion.class); + + private final HBaseConfiguration conf; + private Map indexSpecToTable = new HashMap(); + + public IndexedRegion(final Path basedir, final HLog log, final FileSystem fs, + final HBaseConfiguration conf, final HRegionInfo regionInfo, + final FlushRequester flushListener) { + super(basedir, log, fs, conf, regionInfo, flushListener); + this.conf = conf; + } + + private synchronized HTable getIndexTable(IndexSpecification index) + throws IOException { + HTable indexTable = indexSpecToTable.get(index); + if (indexTable == null) { + indexTable = new HTable(conf, index.getIndexedTableName(super + .getRegionInfo().getTableDesc().getName())); + indexSpecToTable.put(index, indexTable); + } + return indexTable; + } + + private Collection getIndexes() { + return super.getRegionInfo().getTableDesc().getIndexes(); + } + + /** + * @param batchUpdate + * @param lockid + * @param writeToWAL if true, then we write this update to the log + * @throws IOException + */ + @Override + public void batchUpdate(BatchUpdate batchUpdate, Integer lockid, boolean writeToWAL) + throws IOException { + updateIndexes(batchUpdate); // Do this first because will want to see the old row + super.batchUpdate(batchUpdate, lockid, writeToWAL); + } + + private void updateIndexes(BatchUpdate batchUpdate) throws IOException { + List indexesToUpdate = new LinkedList(); + + // Find the indexes we need to update + for (IndexSpecification index : getIndexes()) { + if (possiblyAppliesToIndex(index, batchUpdate)) { + indexesToUpdate.add(index); + } + } + + if (indexesToUpdate.size() == 0) { + return; + } + + Set neededColumns = getColumnsForIndexes(indexesToUpdate); + + SortedMap newColumnValues = getColumnsFromBatchUpdate(batchUpdate); + Map oldColumnCells = super.getFull(batchUpdate.getRow(), + neededColumns, HConstants.LATEST_TIMESTAMP, null); + + // Handle delete batch updates. Go back and get the next older values + for (BatchOperation op : batchUpdate) { + if (!op.isPut()) { + Cell current = oldColumnCells.get(op.getColumn()); + if (current != null) { + Cell [] older = super.get(batchUpdate.getRow(), op.getColumn(), current.getTimestamp(), 1); + if (older != null && older.length > 0) { + newColumnValues.put(op.getColumn(), older[0].getValue()); + } + } + } + } + + // Add the old values to the new if they are not there + for (Entry oldEntry : oldColumnCells.entrySet()) { + if (!newColumnValues.containsKey(oldEntry.getKey())) { + newColumnValues.put(oldEntry.getKey(), oldEntry.getValue().getValue()); + } + } + + + + Iterator indexIterator = indexesToUpdate.iterator(); + while (indexIterator.hasNext()) { + IndexSpecification indexSpec = indexIterator.next(); + if (!doesApplyToIndex(indexSpec, newColumnValues)) { + indexIterator.remove(); + } + } + + SortedMap oldColumnValues = convertToValueMap(oldColumnCells); + + for (IndexSpecification indexSpec : indexesToUpdate) { + removeOldIndexEntry(indexSpec, batchUpdate.getRow(), oldColumnValues); + updateIndex(indexSpec, batchUpdate.getRow(), newColumnValues); + } + } + + /** Return the columns needed for the update. */ + private Set getColumnsForIndexes(Collection indexes) { + Set neededColumns = new TreeSet(Bytes.BYTES_COMPARATOR); + for (IndexSpecification indexSpec : indexes) { + for (byte[] col : indexSpec.getAllColumns()) { + neededColumns.add(col); + } + } + return neededColumns; + } + + private void removeOldIndexEntry(IndexSpecification indexSpec, byte[] row, + SortedMap oldColumnValues) throws IOException { + for (byte[] indexedCol : indexSpec.getIndexedColumns()) { + if (!oldColumnValues.containsKey(indexedCol)) { + LOG.debug("Index [" + indexSpec.getIndexId() + + "] not trying to remove old entry for row [" + + Bytes.toString(row) + "] because col [" + + Bytes.toString(indexedCol) + "] is missing"); + return; + } + } + + byte[] oldIndexRow = indexSpec.getKeyGenerator().createIndexKey(row, + oldColumnValues); + LOG.debug("Index [" + indexSpec.getIndexId() + "] removing old entry [" + + Bytes.toString(oldIndexRow) + "]"); + getIndexTable(indexSpec).deleteAll(oldIndexRow); + } + + private SortedMap getColumnsFromBatchUpdate(BatchUpdate b) { + SortedMap columnValues = new TreeMap( + Bytes.BYTES_COMPARATOR); + for (BatchOperation op : b) { + if (op.isPut()) { + columnValues.put(op.getColumn(), op.getValue()); + } + } + return columnValues; + } + + /** Ask if this update *could* apply to the index. It may actually apply if some of the columns needed are missing. + * + * @param indexSpec + * @param b + * @return true if possibly apply. + */ + private boolean possiblyAppliesToIndex(IndexSpecification indexSpec, BatchUpdate b) { + for (BatchOperation op : b) { + if (indexSpec.containsColumn(op.getColumn())) { + return true; + } + } + return false; + } + + /** Ask if this update does apply to the index. + * + * @param indexSpec + * @param b + * @return true if possibly apply. + */ + private boolean doesApplyToIndex(IndexSpecification indexSpec, SortedMap columnValues) { + + for (byte [] neededCol : indexSpec.getIndexedColumns()) { + if (! columnValues.containsKey(neededCol)) { + LOG.debug("Index [" + indexSpec.getIndexId() + "] can't be updated because [" + + Bytes.toString(neededCol) + "] is missing"); + return false; + } + } + return true; + } + + private void updateIndex(IndexSpecification indexSpec, byte[] row, + SortedMap columnValues) throws IOException { + BatchUpdate indexUpdate = createIndexUpdate(indexSpec, row, columnValues); + getIndexTable(indexSpec).commit(indexUpdate); + LOG.debug("Index [" + indexSpec.getIndexId() + "] adding new entry [" + + Bytes.toString(indexUpdate.getRow()) + "] for row [" + + Bytes.toString(row) + "]"); + + } + + private BatchUpdate createIndexUpdate(IndexSpecification indexSpec, + byte[] row, SortedMap columnValues) { + byte[] indexRow = indexSpec.getKeyGenerator().createIndexKey(row, + columnValues); + BatchUpdate update = new BatchUpdate(indexRow); + + update.put(IndexedTable.INDEX_BASE_ROW_COLUMN, row); + + for (byte[] col : indexSpec.getIndexedColumns()) { + byte[] val = columnValues.get(col); + if (val == null) { + throw new RuntimeException("Unexpected missing column value. ["+Bytes.toString(col)+"]"); + } + update.put(col, val); + } + + for (byte [] col : indexSpec.getAdditionalColumns()) { + byte[] val = columnValues.get(col); + if (val != null) { + update.put(col, val); + } + } + + return update; + } + + @Override + public void deleteAll(final byte[] row, final long ts, final Integer lockid) + throws IOException { + + if (getIndexes().size() != 0) { + + // Need all columns + Set neededColumns = getColumnsForIndexes(getIndexes()); + + Map oldColumnCells = super.getFull(row, + neededColumns, HConstants.LATEST_TIMESTAMP, null); + SortedMap oldColumnValues = convertToValueMap(oldColumnCells); + + + for (IndexSpecification indexSpec : getIndexes()) { + removeOldIndexEntry(indexSpec, row, oldColumnValues); + } + + // Handle if there is still a version visible. + if (ts != HConstants.LATEST_TIMESTAMP) { + Map currentColumnCells = super.getFull(row, + neededColumns, ts, null); + SortedMap currentColumnValues = convertToValueMap(currentColumnCells); + + for (IndexSpecification indexSpec : getIndexes()) { + if (doesApplyToIndex(indexSpec, currentColumnValues)) { + updateIndex(indexSpec, row, currentColumnValues); + } + } + } + } + super.deleteAll(row, ts, lockid); + } + + private SortedMap convertToValueMap( + Map cellMap) { + SortedMap currentColumnValues = new TreeMap(Bytes.BYTES_COMPARATOR); + for(Entry entry : cellMap.entrySet()) { + currentColumnValues.put(entry.getKey(), entry.getValue().getValue()); + } + return currentColumnValues; + } + + @Override + public void deleteAll(final byte[] row, byte[] column, final long ts, + final Integer lockid) throws IOException { + List indexesToUpdate = new LinkedList(); + + for(IndexSpecification indexSpec : getIndexes()) { + if (indexSpec.containsColumn(column)) { + indexesToUpdate.add(indexSpec); + } + } + + Set neededColumns = getColumnsForIndexes(indexesToUpdate); + Map oldColumnCells = super.getFull(row, + neededColumns, HConstants.LATEST_TIMESTAMP, null); + SortedMap oldColumnValues = convertToValueMap(oldColumnCells); + + for (IndexSpecification indexSpec : indexesToUpdate) { + removeOldIndexEntry(indexSpec, row, oldColumnValues); + } + + // Handle if there is still a version visible. + if (ts != HConstants.LATEST_TIMESTAMP) { + Map currentColumnCells = super.getFull(row, + neededColumns, ts, null); + SortedMap currentColumnValues = convertToValueMap(currentColumnCells); + + for (IndexSpecification indexSpec : getIndexes()) { + if (doesApplyToIndex(indexSpec, currentColumnValues)) { + updateIndex(indexSpec, row, currentColumnValues); + } + } + } + + super.deleteAll(row, column, ts, lockid); + } + +} diff --git a/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java new file mode 100644 index 00000000000..fd231157dd6 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/regionserver/tableindexed/IndexedRegionServer.java @@ -0,0 +1,66 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.tableindexed; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.ipc.IndexedRegionInterface; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegion; +import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer; +import org.apache.hadoop.util.Progressable; + +/** + * RegionServer which maintains secondary indexes. + * + **/ +public class IndexedRegionServer extends TransactionalRegionServer implements + IndexedRegionInterface { + + public IndexedRegionServer(HBaseConfiguration conf) throws IOException { + this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS, + DEFAULT_REGIONSERVER_ADDRESS)), conf); + } + + public IndexedRegionServer(HServerAddress serverAddress, + HBaseConfiguration conf) throws IOException { + super(serverAddress, conf); + } + + @Override + protected HRegion instantiateRegion(final HRegionInfo regionInfo) + throws IOException { + HRegion r = new IndexedRegion(HTableDescriptor.getTableDir(super + .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super + .getFileSystem(), super.conf, regionInfo, super.getFlushRequester()); + r.initialize(null, new Progressable() { + public void progress() { + addProcessingMessage(regionInfo); + } + }); + return r; + } + +} diff --git a/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java index 3513ad2ba91..0a894c9e897 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java @@ -73,7 +73,7 @@ import org.apache.hadoop.util.Progressable; * will have to consult the transaction log to determine the final decision of * the transaction. This is not yet implemented. */ -class TransactionalRegion extends HRegion { +public class TransactionalRegion extends HRegion { private static final String LEASE_TIME = "hbase.transaction.leaseTime"; private static final int DEFAULT_LEASE_TIME = 60 * 1000; @@ -501,7 +501,7 @@ class TransactionalRegion extends HRegion { } for (BatchUpdate update : state.getWriteSet()) { - super.batchUpdate(update, false); // Don't need to WAL these + this.batchUpdate(update, false); // Don't need to WAL these // FIME, maybe should be walled so we don't need to look so far back. } diff --git a/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java b/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java index bf2f7b933f2..cec943b75a8 100644 --- a/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java +++ b/src/test/org/apache/hadoop/hbase/PerformanceEvaluation.java @@ -527,7 +527,7 @@ public class PerformanceEvaluation implements HConstants { * @return Returns zero-prefixed 10-byte wide decimal version of passed * number (Does absolute in case number is negative). */ - static byte [] format(final int number) { + public static byte [] format(final int number) { byte [] b = new byte[10]; int d = Math.abs(number); for (int i = b.length - 1; i >= 0; i--) { diff --git a/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java b/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java new file mode 100644 index 00000000000..b5379e60639 --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/client/tableindexed/TestIndexedTable.java @@ -0,0 +1,131 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.tableindexed; + +import java.io.IOException; +import java.util.Random; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseClusterTestCase; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.PerformanceEvaluation; +import org.apache.hadoop.hbase.client.Scanner; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer; +import org.apache.hadoop.hbase.util.Bytes; + +public class TestIndexedTable extends HBaseClusterTestCase { + + private static final Log LOG = LogFactory.getLog(TestIndexedTable.class); + + private static final String TABLE_NAME = "table1"; + + private static final byte[] FAMILY = Bytes.toBytes("family:"); + private static final byte[] COL_A = Bytes.toBytes("family:a"); + private static final String INDEX_COL_A_ASC = "A-Acending"; + + private static final int NUM_ROWS = 10; + private static final int MAX_VAL = 10000; + + private IndexedTableAdmin admin; + private IndexedTable table; + private Random random = new Random(); + + /** constructor */ + public TestIndexedTable() { + conf + .set(HConstants.REGION_SERVER_IMPL, IndexedRegionServer.class.getName()); + conf.setInt("hbase.master.info.port", -1); + conf.setInt("hbase.regionserver.info.port", -1); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + HTableDescriptor desc = new HTableDescriptor(TABLE_NAME); + desc.addFamily(new HColumnDescriptor(FAMILY)); + + // Create a new index that does lexicographic ordering on COL_A + IndexSpecification colAIndex = new IndexSpecification(INDEX_COL_A_ASC, + COL_A, true); + desc.addIndex(colAIndex); + + admin = new IndexedTableAdmin(conf); + admin.createTable(desc); + table = new IndexedTable(conf, desc.getName()); + } + + private void writeInitalRows() throws IOException { + for (int i = 0; i < NUM_ROWS; i++) { + BatchUpdate update = new BatchUpdate(PerformanceEvaluation.format(i)); + byte[] colA = PerformanceEvaluation.format(random.nextInt(MAX_VAL)); + update.put(COL_A, colA); + table.commit(update); + LOG.info("Inserted row [" + Bytes.toString(update.getRow()) + "] val: [" + + Bytes.toString(colA) + "]"); + } + } + + + public void testInitialWrites() throws IOException { + writeInitalRows(); + assertRowsInOrder(NUM_ROWS); + } + + private void assertRowsInOrder(int numRowsExpected) throws IndexNotFoundException, IOException { + Scanner scanner = table.getIndexedScanner(INDEX_COL_A_ASC, + HConstants.EMPTY_START_ROW, null, null, null); + int numRows = 0; + byte[] lastColA = null; + for (RowResult rowResult : scanner) { + byte[] colA = rowResult.get(COL_A).getValue(); + LOG.info("index scan : row [" + Bytes.toString(rowResult.getRow()) + + "] value [" + Bytes.toString(colA) + "]"); + if (lastColA != null) { + Assert.assertTrue(Bytes.compareTo(lastColA, colA) <= 0); + } + lastColA = colA; + numRows++; + } + Assert.assertEquals(numRowsExpected, numRows); + } + + public void testMultipleWrites() throws IOException { + writeInitalRows(); + writeInitalRows(); // Update the rows. + assertRowsInOrder(NUM_ROWS); + } + + public void testDelete() throws IOException { + writeInitalRows(); + // Delete the first row; + table.deleteAll(PerformanceEvaluation.format(0)); + + assertRowsInOrder(NUM_ROWS - 1); + } + +}