diff --git a/conf/hbase-env.sh b/conf/hbase-env.sh
index 4d20dc62703..de80e1ea22f 100644
--- a/conf/hbase-env.sh
+++ b/conf/hbase-env.sh
@@ -23,6 +23,7 @@
# The java implementation to use. Java 1.6 required.
# export JAVA_HOME=/usr/java/jdk1.6.0/
+export JAVA_HOME=/usr/lib/jvm/java-6-sun
# Extra Java CLASSPATH elements. Optional.
# export HBASE_CLASSPATH=
diff --git a/src/contrib/build-contrib.xml b/src/contrib/build-contrib.xml
new file mode 100644
index 00000000000..b1d7b2cb544
--- /dev/null
+++ b/src/contrib/build-contrib.xml
@@ -0,0 +1,237 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+ Tests failed!
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/contrib/build.xml b/src/contrib/build.xml
new file mode 100644
index 00000000000..6f0e556ad88
--- /dev/null
+++ b/src/contrib/build.xml
@@ -0,0 +1,65 @@
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java
new file mode 100644
index 00000000000..bca8c556e46
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexKeyGenerator.java
@@ -0,0 +1,38 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.util.Map;
+
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Interface for generating an index-row-key from a row in the base table.
+ */
+public interface IndexKeyGenerator extends Writable {
+
+ /** Create an index key from a base row.
+ *
+ * @param rowKey the row key of the base row
+ * @param columns the columns in the base row
+ * @return the row key in the indexed row.
+ */
+ byte[] createIndexKey(byte[] rowKey, Map columns);
+}
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java
new file mode 100644
index 00000000000..3e6169c4b6b
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexNotFoundException.java
@@ -0,0 +1,47 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.io.IOException;
+
+/**
+ * Thrown when asking for an index that does not exist.
+ */
+public class IndexNotFoundException extends IOException {
+
+ private static final long serialVersionUID = 6533971528557000965L;
+
+ public IndexNotFoundException() {
+ super();
+ }
+
+ public IndexNotFoundException(String arg0) {
+ super(arg0);
+ }
+
+ public IndexNotFoundException(Throwable arg0) {
+ super(arg0.getMessage());
+ }
+
+ public IndexNotFoundException(String arg0, Throwable arg1) {
+ super(arg0+arg1.getMessage());
+ }
+
+}
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java
new file mode 100644
index 00000000000..751a569fb11
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecification.java
@@ -0,0 +1,199 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.io.WritableComparable;
+
+/** Holds the specification for a single secondary index. */
+public class IndexSpecification implements Writable {
+
+ // Columns that are indexed (part of the indexRowKey)
+ private byte[][] indexedColumns;
+
+ // Constructs the
+ private IndexKeyGenerator keyGenerator;
+
+ // Additional columns mapped into the indexed row. These will be available for
+ // filters when scanning the index.
+ private byte[][] additionalColumns;
+
+ private byte[][] allColumns;
+
+ // Id of this index, unique within a table.
+ private String indexId;
+
+ /** Construct an "simple" index spec for a single column.
+ * @param indexId
+ * @param indexedColumn
+ */
+ public IndexSpecification(String indexId, byte[] indexedColumn) {
+ this(indexId, new byte[][] { indexedColumn }, null,
+ new SimpleIndexKeyGenerator(indexedColumn));
+ }
+
+ /**
+ * Construct an index spec by specifying everything.
+ *
+ * @param indexId
+ * @param indexedColumns
+ * @param additionalColumns
+ * @param keyGenerator
+ * @param keyComparator
+ */
+ public IndexSpecification(String indexId, byte[][] indexedColumns,
+ byte[][] additionalColumns, IndexKeyGenerator keyGenerator) {
+ this.indexId = indexId;
+ this.indexedColumns = indexedColumns;
+ this.additionalColumns = additionalColumns;
+ this.keyGenerator = keyGenerator;
+ this.makeAllColumns();
+ }
+
+ public IndexSpecification() {
+ // For writable
+ }
+
+ private void makeAllColumns() {
+ this.allColumns = new byte[indexedColumns.length
+ + (additionalColumns == null ? 0 : additionalColumns.length)][];
+ System.arraycopy(indexedColumns, 0, allColumns, 0, indexedColumns.length);
+ if (additionalColumns != null) {
+ System.arraycopy(additionalColumns, 0, allColumns, indexedColumns.length,
+ additionalColumns.length);
+ }
+ }
+
+ /**
+ * Get the indexedColumns.
+ *
+ * @return Return the indexedColumns.
+ */
+ public byte[][] getIndexedColumns() {
+ return indexedColumns;
+ }
+
+ /**
+ * Get the keyGenerator.
+ *
+ * @return Return the keyGenerator.
+ */
+ public IndexKeyGenerator getKeyGenerator() {
+ return keyGenerator;
+ }
+
+ /**
+ * Get the additionalColumns.
+ *
+ * @return Return the additionalColumns.
+ */
+ public byte[][] getAdditionalColumns() {
+ return additionalColumns;
+ }
+
+ /**
+ * Get the indexId.
+ *
+ * @return Return the indexId.
+ */
+ public String getIndexId() {
+ return indexId;
+ }
+
+ public byte[][] getAllColumns() {
+ return allColumns;
+ }
+
+ public boolean containsColumn(byte[] column) {
+ for (byte[] col : allColumns) {
+ if (Bytes.equals(column, col)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public byte[] getIndexedTableName(byte[] baseTableName) {
+ return Bytes.add(baseTableName, Bytes.toBytes("-" + indexId));
+ }
+
+ private static final HBaseConfiguration CONF = new HBaseConfiguration();
+
+ /** {@inheritDoc} */
+ public void readFields(DataInput in) throws IOException {
+ indexId = in.readUTF();
+ int numIndexedCols = in.readInt();
+ indexedColumns = new byte[numIndexedCols][];
+ for (int i = 0; i < numIndexedCols; i++) {
+ indexedColumns[i] = Bytes.readByteArray(in);
+ }
+ int numAdditionalCols = in.readInt();
+ additionalColumns = new byte[numAdditionalCols][];
+ for (int i = 0; i < numAdditionalCols; i++) {
+ additionalColumns[i] = Bytes.readByteArray(in);
+ }
+ makeAllColumns();
+ keyGenerator = (IndexKeyGenerator) ObjectWritable.readObject(in, CONF);
+
+ // FIXME this is to read the deprecated comparator, in existing data
+ ObjectWritable.readObject(in, CONF);
+ }
+
+ /** {@inheritDoc} */
+ public void write(DataOutput out) throws IOException {
+ out.writeUTF(indexId);
+ out.writeInt(indexedColumns.length);
+ for (byte[] col : indexedColumns) {
+ Bytes.writeByteArray(out, col);
+ }
+ if (additionalColumns != null) {
+ out.writeInt(additionalColumns.length);
+ for (byte[] col : additionalColumns) {
+ Bytes.writeByteArray(out, col);
+ }
+ } else {
+ out.writeInt(0);
+ }
+ ObjectWritable
+ .writeObject(out, keyGenerator, IndexKeyGenerator.class, CONF);
+
+ // FIXME need to maintain this for exisitng data
+ ObjectWritable.writeObject(out, null, WritableComparable.class,
+ CONF);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("ID => ");
+ sb.append(indexId);
+ return sb.toString();
+ }
+
+
+}
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecificationArray.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecificationArray.java
new file mode 100644
index 00000000000..afb9c841a2d
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexSpecificationArray.java
@@ -0,0 +1,45 @@
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+/** Holds an array of index specifications.
+ *
+ */
+public class IndexSpecificationArray implements Writable {
+
+ private IndexSpecification [] indexSpecifications;
+
+ public IndexSpecificationArray() {
+ // FOr writable
+ }
+ public IndexSpecificationArray(IndexSpecification[] specs) {
+ this.indexSpecifications = specs;
+ }
+
+ public void readFields(DataInput in) throws IOException {
+ int size = in.readInt();
+ indexSpecifications = new IndexSpecification[size];
+ for (int i=0; i indexIdToTable = new HashMap();
+
+ public IndexedTable(final HBaseConfiguration conf, final byte[] tableName)
+ throws IOException {
+ super(conf, tableName);
+ this.indexedTableDescriptor = new IndexedTableDescriptor(super.getTableDescriptor());
+ for (IndexSpecification spec : this.indexedTableDescriptor.getIndexes()) {
+ indexIdToTable.put(spec.getIndexId(), new HTable(conf, spec
+ .getIndexedTableName(tableName)));
+ }
+ }
+
+ public IndexedTableDescriptor getIndexedTableDescriptor() {
+ return this.indexedTableDescriptor;
+ }
+
+ /**
+ * Open up an indexed scanner. Results will come back in the indexed order,
+ * but will contain RowResults from the original table.
+ *
+ * @param indexId the id of the index to use
+ * @param indexStartRow (created from the IndexKeyGenerator)
+ * @param indexColumns in the index table
+ * @param indexFilter filter to run on the index'ed table. This can only use
+ * columns that have been added to the index.
+ * @param baseColumns from the original table
+ * @return scanner
+ * @throws IOException
+ * @throws IndexNotFoundException
+ */
+ public ResultScanner getIndexedScanner(String indexId, final byte[] indexStartRow,
+ byte[][] indexColumns, final Filter indexFilter,
+ final byte[][] baseColumns) throws IOException, IndexNotFoundException {
+ IndexSpecification indexSpec = this.indexedTableDescriptor.getIndex(indexId);
+ if (indexSpec == null) {
+ throw new IndexNotFoundException("Index " + indexId
+ + " not defined in table "
+ + super.getTableDescriptor().getNameAsString());
+ }
+ verifyIndexColumns(indexColumns, indexSpec);
+ // TODO, verify/remove index columns from baseColumns
+
+ HTable indexTable = indexIdToTable.get(indexId);
+
+ byte[][] allIndexColumns;
+ if (indexColumns != null) {
+ allIndexColumns = new byte[indexColumns.length + 1][];
+ System
+ .arraycopy(indexColumns, 0, allIndexColumns, 0, indexColumns.length);
+ allIndexColumns[indexColumns.length] = INDEX_BASE_ROW_COLUMN;
+ } else {
+ byte[][] allColumns = indexSpec.getAllColumns();
+ allIndexColumns = new byte[allColumns.length + 1][];
+ System.arraycopy(allColumns, 0, allIndexColumns, 0, allColumns.length);
+ allIndexColumns[allColumns.length] = INDEX_BASE_ROW_COLUMN;
+ }
+
+ Scan indexScan = new Scan(indexStartRow);
+ //indexScan.setFilter(filter); // FIXME
+ indexScan.addColumns(allIndexColumns);
+ ResultScanner indexScanner = indexTable.getScanner(indexScan);
+
+ return new ScannerWrapper(indexScanner, baseColumns);
+ }
+
+ private void verifyIndexColumns(byte[][] requestedColumns,
+ IndexSpecification indexSpec) {
+ if (requestedColumns == null) {
+ return;
+ }
+ for (byte[] requestedColumn : requestedColumns) {
+ boolean found = false;
+ for (byte[] indexColumn : indexSpec.getAllColumns()) {
+ if (Bytes.equals(requestedColumn, indexColumn)) {
+ found = true;
+ break;
+ }
+ }
+ if (!found) {
+ throw new RuntimeException("Column [" + Bytes.toString(requestedColumn)
+ + "] not in index " + indexSpec.getIndexId());
+ }
+ }
+ }
+
+ private class ScannerWrapper implements ResultScanner {
+
+ private ResultScanner indexScanner;
+ private byte[][] columns;
+
+ public ScannerWrapper(ResultScanner indexScanner, byte[][] columns) {
+ this.indexScanner = indexScanner;
+ this.columns = columns;
+ }
+
+ /** {@inheritDoc} */
+ public Result next() throws IOException {
+ Result[] result = next(1);
+ if (result == null || result.length < 1)
+ return null;
+ return result[0];
+ }
+
+ /** {@inheritDoc} */
+ public Result[] next(int nbRows) throws IOException {
+ Result[] indexResult = indexScanner.next(nbRows);
+ if (indexResult == null) {
+ return null;
+ }
+ Result[] result = new Result[indexResult.length];
+ for (int i = 0; i < indexResult.length; i++) {
+ Result row = indexResult[i];
+
+ byte[] baseRow = row.getValue(INDEX_BASE_ROW_COLUMN);
+ LOG.debug("next index row [" + Bytes.toString(row.getRow())
+ + "] -> base row [" + Bytes.toString(baseRow) + "]");
+ HbaseMapWritable colValues =
+ new HbaseMapWritable();
+ Result baseResult = null;
+ if (columns != null && columns.length > 0) {
+ LOG.debug("Going to base table for remaining columns");
+ Get baseGet = new Get(baseRow);
+ baseGet.addColumns(columns);
+ baseResult = IndexedTable.this.get(baseGet);
+ }
+
+ List results = new ArrayList();
+ for (KeyValue indexKV : row.list()) {
+ byte[] col = indexKV.getColumn();
+ if (HStoreKey.matchingFamily(INDEX_COL_FAMILY_NAME, col)) {
+ continue;
+ }
+ results.add(new KeyValue(baseRow, indexKV.getColumn(), indexKV.getTimestamp(), KeyValue.Type.Put, indexKV.getValue()));
+ }
+
+ if (baseResult != null) {
+ results.addAll(baseResult.list());
+ }
+
+ result[i] = new Result(results);
+ }
+ return result;
+ }
+
+ /** {@inheritDoc} */
+ public void close() {
+ indexScanner.close();
+ }
+
+ // Copied from HTable.ClientScanner.iterator()
+ public Iterator iterator() {
+ return new Iterator() {
+ // The next RowResult, possibly pre-read
+ Result next = null;
+
+ // return true if there is another item pending, false if there isn't.
+ // this method is where the actual advancing takes place, but you need
+ // to call next() to consume it. hasNext() will only advance if there
+ // isn't a pending next().
+ public boolean hasNext() {
+ if (next == null) {
+ try {
+ next = ScannerWrapper.this.next();
+ return next != null;
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return true;
+ }
+
+ // get the pending next item and advance the iterator. returns null if
+ // there is no next item.
+ public Result next() {
+ // since hasNext() does the real advancing, we call this to determine
+ // if there is a next before proceeding.
+ if (!hasNext()) {
+ return null;
+ }
+
+ // if we get to here, then hasNext() has given us an item to return.
+ // we want to return the item and then null out the next pointer, so
+ // we use a temporary variable.
+ Result temp = next;
+ next = null;
+ return temp;
+ }
+
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ }
+}
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java
new file mode 100644
index 00000000000..49b4b19e6ed
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableAdmin.java
@@ -0,0 +1,149 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.io.IOException;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ColumnNameParseException;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HStoreKey;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.MasterNotRunningException;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.regionserver.tableindexed.IndexMaintenanceUtils;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Extension of HBaseAdmin that creates indexed tables.
+ *
+ */
+public class IndexedTableAdmin extends HBaseAdmin {
+
+ private static final Log LOG = LogFactory.getLog(IndexedTableAdmin.class);
+
+ /**
+ * Constructor
+ *
+ * @param conf Configuration object
+ * @throws MasterNotRunningException
+ */
+ public IndexedTableAdmin(HBaseConfiguration conf)
+ throws MasterNotRunningException {
+ super(conf);
+ }
+
+ /**
+ * Creates a new indexed table
+ *
+ * @param desc table descriptor for table
+ *
+ * @throws IOException
+ */
+ public void createIndexedTable(IndexedTableDescriptor desc) throws IOException {
+ super.createTable(desc.getBaseTableDescriptor());
+ this.createIndexTables(desc);
+ }
+
+ private void createIndexTables(IndexedTableDescriptor indexDesc) throws IOException {
+ byte[] baseTableName = indexDesc.getBaseTableDescriptor().getName();
+ for (IndexSpecification indexSpec : indexDesc.getIndexes()) {
+ HTableDescriptor indexTableDesc = createIndexTableDesc(baseTableName,
+ indexSpec);
+ super.createTable(indexTableDesc);
+ }
+ }
+
+ private HTableDescriptor createIndexTableDesc(byte[] baseTableName,
+ IndexSpecification indexSpec) throws ColumnNameParseException {
+ HTableDescriptor indexTableDesc = new HTableDescriptor(indexSpec
+ .getIndexedTableName(baseTableName));
+ Set families = new TreeSet(Bytes.BYTES_COMPARATOR);
+ families.add(IndexedTable.INDEX_COL_FAMILY);
+ for (byte[] column : indexSpec.getAllColumns()) {
+ families.add(Bytes.add(HStoreKey.getFamily(column),
+ new byte[] { HStoreKey.COLUMN_FAMILY_DELIMITER }));
+ }
+
+ for (byte[] colFamily : families) {
+ indexTableDesc.addFamily(new HColumnDescriptor(colFamily));
+ }
+
+ return indexTableDesc;
+ }
+
+ /** Remove an index for a table.
+ * @throws IOException
+ *
+ */
+ public void removeIndex(byte[] baseTableName, String indexId) throws IOException {
+ super.disableTable(baseTableName);
+ HTableDescriptor desc = super.getTableDescriptor(baseTableName);
+ IndexedTableDescriptor indexDesc = new IndexedTableDescriptor(desc);
+ IndexSpecification spec = indexDesc.getIndex(indexId);
+ indexDesc.removeIndex(indexId);
+ this.disableTable(spec.getIndexedTableName(baseTableName));
+ this.deleteTable(spec.getIndexedTableName(baseTableName));
+ super.modifyTable(baseTableName, desc);
+ super.enableTable(baseTableName);
+ }
+
+ /** Add an index to a table. */
+ public void addIndex(byte []baseTableName, IndexSpecification indexSpec) throws IOException {
+ LOG.warn("Adding index to existing table ["+Bytes.toString(baseTableName)+"], this may take a long time");
+ // TODO, make table read-only
+ LOG.warn("Not putting table in readonly, if its being written to, the index may get out of sync");
+ HTableDescriptor indexTableDesc = createIndexTableDesc(baseTableName, indexSpec);
+ super.createTable(indexTableDesc);
+ super.disableTable(baseTableName);
+ IndexedTableDescriptor indexDesc = new IndexedTableDescriptor(super.getTableDescriptor(baseTableName));
+ indexDesc.addIndex(indexSpec);
+ super.modifyTable(baseTableName, indexDesc.getBaseTableDescriptor());
+ super.enableTable(baseTableName);
+ reIndexTable(baseTableName, indexSpec);
+ }
+
+ private void reIndexTable(byte[] baseTableName, IndexSpecification indexSpec) throws IOException {
+ HTable baseTable = new HTable(baseTableName);
+ HTable indexTable = new HTable(indexSpec.getIndexedTableName(baseTableName));
+ for (RowResult rowResult : baseTable.getScanner(indexSpec.getAllColumns())) {
+ SortedMap columnValues = new TreeMap(Bytes.BYTES_COMPARATOR);
+ for (Entry entry : rowResult.entrySet()) {
+ columnValues.put(entry.getKey(), entry.getValue().getValue());
+ }
+ if (IndexMaintenanceUtils.doesApplyToIndex(indexSpec, columnValues)) {
+ Put indexUpdate = IndexMaintenanceUtils.createIndexUpdate(indexSpec, rowResult.getRow(), columnValues);
+ indexTable.put(indexUpdate);
+ }
+ }
+ }
+}
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableDescriptor.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableDescriptor.java
new file mode 100644
index 00000000000..5ac99d201e9
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/IndexedTableDescriptor.java
@@ -0,0 +1,115 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.ObjectWritable;
+import org.apache.hadoop.io.Writable;
+
+public class IndexedTableDescriptor {
+
+ private static final byte[] INDEXES_KEY = Bytes.toBytes("INDEXES");
+
+ private final HTableDescriptor baseTableDescriptor;
+ // Key is indexId
+ private final Map indexes = new HashMap();
+
+ public IndexedTableDescriptor(HTableDescriptor baseTableDescriptor)
+ throws IOException {
+ this.baseTableDescriptor = baseTableDescriptor;
+ readFromTable();
+ }
+
+ public HTableDescriptor getBaseTableDescriptor() {
+ return this.baseTableDescriptor;
+ }
+
+ private void readFromTable() throws IOException {
+ byte [] bytes = baseTableDescriptor.getValue(INDEXES_KEY);
+ if (bytes == null) {
+ return;
+ }
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+ DataInputStream dis = new DataInputStream(bais);
+ IndexSpecificationArray indexArray = new IndexSpecificationArray();
+ indexArray.readFields(dis);
+ for (Writable index : indexArray.getIndexSpecifications()) {
+ IndexSpecification indexSpec = (IndexSpecification) index;
+ indexes.put(indexSpec.getIndexId(), indexSpec);
+ }
+ }
+
+ private void writeToTable() {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream dos = new DataOutputStream(baos);
+ IndexSpecificationArray indexArray = new IndexSpecificationArray(indexes.values().toArray(new IndexSpecification[0]));
+
+ try {
+ indexArray.write(dos);
+ dos.flush();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ baseTableDescriptor.setValue(INDEXES_KEY, baos.toByteArray());
+ }
+
+ public Collection getIndexes() {
+ return indexes.values();
+ }
+
+ public IndexSpecification getIndex(String indexId) {
+ return indexes.get(indexId);
+ }
+
+ public void addIndex(IndexSpecification index) {
+ indexes.put(index.getIndexId(), index);
+ writeToTable();
+ }
+
+ public void removeIndex(String indexId) {
+ indexes.remove(indexId);
+ writeToTable();
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder s = new StringBuilder(baseTableDescriptor.toString());
+
+ if (!indexes.isEmpty()) {
+ s.append(", ");
+ s.append("INDEXES");
+ s.append(" => ");
+ s.append(indexes.values());
+ }
+ return s.toString();
+ }
+}
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java
new file mode 100644
index 00000000000..49694177a70
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/SimpleIndexKeyGenerator.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.tableindexed;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.hbase.util.Bytes;
+
+/** Creates indexed keys for a single column....
+ *
+ */
+public class SimpleIndexKeyGenerator implements IndexKeyGenerator {
+
+ private byte [] column;
+
+ public SimpleIndexKeyGenerator(byte [] column) {
+ this.column = column;
+ }
+
+ public SimpleIndexKeyGenerator() {
+ // For Writable
+ }
+
+ /** {@inheritDoc} */
+ public byte[] createIndexKey(byte[] rowKey, Map columns) {
+ return Bytes.add(columns.get(column), rowKey);
+ }
+
+ /** {@inheritDoc} */
+ public void readFields(DataInput in) throws IOException {
+ column = Bytes.readByteArray(in);
+ }
+
+ /** {@inheritDoc} */
+ public void write(DataOutput out) throws IOException {
+ Bytes.writeByteArray(out, column);
+ }
+
+}
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html
new file mode 100644
index 00000000000..36214f7ed25
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/tableindexed/package.html
@@ -0,0 +1,46 @@
+
+
+
+
+
+
+
+
+This package provides support for secondary indexing by maintaining a separate, "index", table for each index.
+
+The IndexSpecification class provides the metadata for the index. This includes:
+
the columns that contribute to the index key,
+
additional columns to put in the index table (and are thus made available to filters on the index table),
+ and
+
an IndexKeyGenerator which constructs the index-row-key from the indexed column(s) and the original row.
+
+IndexesSpecifications can be added to a table's metadata (HTableDescriptor) before the table is constructed.
+Afterwards, updates and deletes to the original table will trigger the updates in the index, and
+the indexes can be scanned using the API on IndexedTable.
+
+For a simple example, look at the unit test in org.apache.hadoop.hbase.client.tableIndexed.
+
+
To enable the indexing, modify hbase-site.xml to turn on the
+IndexedRegionServer. This is done by setting
+hbase.regionserver.class to
+org.apache.hadoop.hbase.ipc.IndexedRegionInterface and
+hbase.regionserver.impl to
+org.apache.hadoop.hbase.regionserver.tableindexed.IndexedRegionServer
+
+
+
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java
new file mode 100644
index 00000000000..76573630071
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java
@@ -0,0 +1,56 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+/** Thrown when a transaction cannot be committed.
+ *
+ */
+public class CommitUnsuccessfulException extends Exception {
+
+ private static final long serialVersionUID = 7062921444531109202L;
+
+ /** Default Constructor */
+ public CommitUnsuccessfulException() {
+ super();
+ }
+
+ /**
+ * @param arg0 message
+ * @param arg1 cause
+ */
+ public CommitUnsuccessfulException(String arg0, Throwable arg1) {
+ super(arg0, arg1);
+ }
+
+ /**
+ * @param arg0 message
+ */
+ public CommitUnsuccessfulException(String arg0) {
+ super(arg0);
+ }
+
+ /**
+ * @param arg0 cause
+ */
+ public CommitUnsuccessfulException(Throwable arg0) {
+ super(arg0);
+ }
+
+}
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/HBaseBackedTransactionLogger.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/HBaseBackedTransactionLogger.java
new file mode 100644
index 00000000000..01a4d75565b
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/HBaseBackedTransactionLogger.java
@@ -0,0 +1,120 @@
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HColumnDescriptor;
+import org.apache.hadoop.hbase.HTableDescriptor;
+import org.apache.hadoop.hbase.client.HBaseAdmin;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.BatchUpdate;
+import org.apache.hadoop.hbase.io.Cell;
+import org.apache.hadoop.hbase.io.RowResult;
+import org.apache.hadoop.hbase.util.Bytes;
+
+public class HBaseBackedTransactionLogger implements TransactionLogger {
+
+ /** The name of the transaction status table. */
+ public static final String TABLE_NAME = "__GLOBAL_TRX_LOG__";
+
+ private static final String INFO_FAMILY = "Info:";
+
+ /**
+ * Column which holds the transaction status.
+ *
+ */
+ private static final String STATUS_COLUMN = INFO_FAMILY + "Status";
+ private static final byte[] STATUS_COLUMN_BYTES = Bytes
+ .toBytes(STATUS_COLUMN);
+
+ /**
+ * Create the table.
+ *
+ * @throws IOException
+ *
+ */
+ public static void createTable() throws IOException {
+ HTableDescriptor tableDesc = new HTableDescriptor(TABLE_NAME);
+ tableDesc.addFamily(new HColumnDescriptor(INFO_FAMILY));
+ HBaseAdmin admin = new HBaseAdmin(new HBaseConfiguration());
+ admin.createTable(tableDesc);
+ }
+
+ private Random random = new Random();
+ private HTable table;
+
+ public HBaseBackedTransactionLogger() throws IOException {
+ initTable();
+ }
+
+ private void initTable() throws IOException {
+ HBaseAdmin admin = new HBaseAdmin(new HBaseConfiguration());
+
+ if (!admin.tableExists(TABLE_NAME)) {
+ throw new RuntimeException("Table not created. Call createTable() first");
+ }
+ this.table = new HTable(TABLE_NAME);
+
+ }
+
+ public long createNewTransactionLog() {
+ long id;
+ TransactionStatus existing;
+
+ do {
+ id = random.nextLong();
+ existing = getStatusForTransaction(id);
+ } while (existing != null);
+
+ setStatusForTransaction(id, TransactionStatus.PENDING);
+
+ return id;
+ }
+
+ public TransactionStatus getStatusForTransaction(long transactionId) {
+ try {
+ RowResult result = table.getRow(getRow(transactionId));
+ if (result == null || result.isEmpty()) {
+ return null;
+ }
+ Cell statusCell = result.get(STATUS_COLUMN_BYTES);
+ if (statusCell == null) {
+ throw new RuntimeException("No status cell for row " + transactionId);
+ }
+ String statusString = Bytes.toString(statusCell.getValue());
+ return TransactionStatus.valueOf(statusString);
+
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private byte [] getRow(long transactionId) {
+ return Bytes.toBytes(""+transactionId);
+ }
+
+ public void setStatusForTransaction(long transactionId,
+ TransactionStatus status) {
+ BatchUpdate update = new BatchUpdate(getRow(transactionId));
+ update.put(STATUS_COLUMN, Bytes.toBytes(status.name()));
+
+ try {
+ table.commit(update);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void forgetTransaction(long transactionId) {
+ BatchUpdate update = new BatchUpdate(getRow(transactionId));
+ update.delete(STATUS_COLUMN);
+
+ try {
+ table.commit(update);
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+}
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/JtaXAResource.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/JtaXAResource.java
new file mode 100644
index 00000000000..21ed9933d6b
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/JtaXAResource.java
@@ -0,0 +1,136 @@
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+
+/**
+ * View hbase as a JTA transactional resource. This allows it to participate in
+ * transactions across multiple resources.
+ *
+ *
+ */
+public class JtaXAResource implements XAResource {
+
+ static final Log LOG = LogFactory.getLog(JtaXAResource.class);
+
+ private Map xidToTransactionState = new HashMap();
+ private final TransactionManager transactionManager;
+ private ThreadLocal threadLocalTransactionState = new ThreadLocal();
+
+ public JtaXAResource(TransactionManager transactionManager) {
+ this.transactionManager = transactionManager;
+ }
+
+ public void commit(Xid xid, boolean onePhase) throws XAException {
+ LOG.trace("commit [" + xid.toString() + "] "
+ + (onePhase ? "one phase" : "two phase"));
+ TransactionState state = xidToTransactionState.remove(xid);
+ if (state == null) {
+ throw new XAException(XAException.XAER_NOTA);
+ }
+ try {
+ if (onePhase) {
+ transactionManager.tryCommit(state);
+ } else {
+ transactionManager.doCommit(state);
+ }
+ } catch (CommitUnsuccessfulException e) {
+ throw new XAException(XAException.XA_RBROLLBACK);
+ } catch (IOException e) {
+ throw new XAException(XAException.XA_RBPROTO); // FIXME correct code?
+ } finally {
+ threadLocalTransactionState.remove();
+ }
+
+ }
+
+ public void end(Xid xid, int flags) throws XAException {
+ LOG.trace("end [" + xid.toString() + "] ");
+ threadLocalTransactionState.remove();
+ }
+
+ public void forget(Xid xid) throws XAException {
+ LOG.trace("forget [" + xid.toString() + "] ");
+ threadLocalTransactionState.remove();
+ TransactionState state = xidToTransactionState.remove(xid);
+ if (state != null) {
+ try {
+ transactionManager.abort(state);
+ } catch (IOException e) {
+ throw new RuntimeException(e); // FIXME, should be an XAException?
+ }
+ }
+ }
+
+ public int getTransactionTimeout() throws XAException {
+ return 0;
+ }
+
+ public boolean isSameRM(XAResource xares) throws XAException {
+ if (xares instanceof JtaXAResource) {
+ return true;
+ }
+ return false;
+ }
+
+ public int prepare(Xid xid) throws XAException {
+ LOG.trace("prepare [" + xid.toString() + "] ");
+ TransactionState state = xidToTransactionState.get(xid);
+ int status;
+ try {
+ status = this.transactionManager.prepareCommit(state);
+ } catch (CommitUnsuccessfulException e) {
+ throw new XAException(XAException.XA_HEURRB); // FIXME correct code?
+ } catch (IOException e) {
+ throw new XAException(XAException.XA_RBPROTO); // FIXME correct code?
+ }
+
+ switch (status) {
+ case TransactionalRegionInterface.COMMIT_OK:
+ return XAResource.XA_OK;
+ case TransactionalRegionInterface.COMMIT_OK_READ_ONLY:
+ return XAResource.XA_RDONLY;
+ default:
+ throw new XAException(XAException.XA_RBPROTO); // FIXME correct code?
+ }
+ }
+
+ public Xid[] recover(int flag) throws XAException {
+ return xidToTransactionState.keySet().toArray(new Xid[] {});
+ }
+
+ public void rollback(Xid xid) throws XAException {
+ LOG.trace("rollback [" + xid.toString() + "] ");
+ forget(xid);
+ threadLocalTransactionState.remove();
+ }
+
+ public boolean setTransactionTimeout(int seconds) throws XAException {
+ return false; // Currently not supported. (Only global lease time)
+ }
+
+ public void start(Xid xid, int flags) throws XAException {
+ LOG.trace("start [" + xid.toString() + "] ");
+ // TODO, check flags
+ TransactionState state = this.transactionManager.beginTransaction();
+ threadLocalTransactionState.set(state);
+ xidToTransactionState.put(xid, state);
+ }
+
+ /**
+ * @return the threadLocalTransaction state.
+ */
+ public TransactionState getThreadLocalTransactionState() {
+ return threadLocalTransactionState.get();
+ }
+
+}
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java
new file mode 100644
index 00000000000..a36a7dfaa6e
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java
@@ -0,0 +1,79 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+
+/**
+ * A local, in-memory implementation of the transaction logger. Does not provide a global view, so
+ * it can't be relighed on by
+ *
+ */
+public class LocalTransactionLogger implements TransactionLogger {
+
+ private static LocalTransactionLogger instance;
+
+ /**
+ * Creates singleton if it does not exist
+ *
+ * @return reference to singleton
+ */
+ public synchronized static LocalTransactionLogger getInstance() {
+ if (instance == null) {
+ instance = new LocalTransactionLogger();
+ }
+ return instance;
+ }
+
+ private Random random = new Random();
+ private Map transactionIdToStatusMap = Collections
+ .synchronizedMap(new HashMap());
+
+ private LocalTransactionLogger() {
+ // Enforce singlton
+ }
+
+ /** @return random longs to minimize possibility of collision */
+ public long createNewTransactionLog() {
+ long id;
+ do {
+ id = random.nextLong();
+ } while (transactionIdToStatusMap.containsKey(id));
+ transactionIdToStatusMap.put(id, TransactionStatus.PENDING);
+ return id;
+ }
+
+ public TransactionStatus getStatusForTransaction(final long transactionId) {
+ return transactionIdToStatusMap.get(transactionId);
+ }
+
+ public void setStatusForTransaction(final long transactionId,
+ final TransactionStatus status) {
+ transactionIdToStatusMap.put(transactionId, status);
+ }
+
+ public void forgetTransaction(long transactionId) {
+ transactionIdToStatusMap.remove(transactionId);
+ }
+
+}
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java
new file mode 100644
index 00000000000..5ea321aec74
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java
@@ -0,0 +1,59 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+/**
+ * Simple interface used to provide a log about transaction status. Written to
+ * by the client, and read by regionservers in case of failure.
+ *
+ */
+public interface TransactionLogger {
+
+ /** Transaction status values */
+ enum TransactionStatus {
+ /** Transaction is pending */
+ PENDING,
+ /** Transaction was committed */
+ COMMITTED,
+ /** Transaction was aborted */
+ ABORTED
+ }
+
+ /**
+ * Create a new transaction log. Return the transaction's globally unique id.
+ * Log's initial value should be PENDING
+ *
+ * @return transaction id
+ */
+ long createNewTransactionLog();
+
+ /**
+ * @param transactionId
+ * @return transaction status
+ */
+ TransactionStatus getStatusForTransaction(long transactionId);
+
+ /**
+ * @param transactionId
+ * @param status
+ */
+ void setStatusForTransaction(long transactionId, TransactionStatus status);
+
+}
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
new file mode 100644
index 00000000000..9fe52700d84
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java
@@ -0,0 +1,239 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+import java.util.Iterator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.NotServingRegionException;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HConnectionManager;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+
+/**
+ * Transaction Manager. Responsible for committing transactions.
+ *
+ */
+public class TransactionManager {
+ static final Log LOG = LogFactory.getLog(TransactionManager.class);
+
+ private final HConnection connection;
+ private final TransactionLogger transactionLogger;
+ private JtaXAResource xAResource;
+
+ /**
+ * @param conf
+ */
+ public TransactionManager(final HBaseConfiguration conf) {
+ this(LocalTransactionLogger.getInstance(), conf);
+ }
+
+ /**
+ * @param transactionLogger
+ * @param conf
+ */
+ public TransactionManager(final TransactionLogger transactionLogger,
+ final HBaseConfiguration conf) {
+ this.transactionLogger = transactionLogger;
+ connection = HConnectionManager.getConnection(conf);
+ }
+
+ /**
+ * Called to start a transaction.
+ *
+ * @return new transaction state
+ */
+ public TransactionState beginTransaction() {
+ long transactionId = transactionLogger.createNewTransactionLog();
+ LOG.debug("Begining transaction " + transactionId);
+ return new TransactionState(transactionId);
+ }
+
+ /**
+ * Prepare to commit a transaction.
+ *
+ * @param transactionState
+ * @return commitStatusCode (see {@link TransactionalRegionInterface})
+ * @throws IOException
+ * @throws CommitUnsuccessfulException
+ */
+ public int prepareCommit(final TransactionState transactionState)
+ throws CommitUnsuccessfulException, IOException {
+ boolean allReadOnly = true;
+ try {
+ Iterator locationIterator = transactionState.getParticipatingRegions().iterator();
+ while (locationIterator.hasNext()) {
+ HRegionLocation location = locationIterator.next();
+ TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
+ .getHRegionConnection(location.getServerAddress());
+ int commitStatus = transactionalRegionServer.commitRequest(location
+ .getRegionInfo().getRegionName(), transactionState
+ .getTransactionId());
+ boolean canCommit = true;
+ switch (commitStatus) {
+ case TransactionalRegionInterface.COMMIT_OK:
+ allReadOnly = false;
+ break;
+ case TransactionalRegionInterface.COMMIT_OK_READ_ONLY:
+ locationIterator.remove(); // No need to doCommit for read-onlys
+ break;
+ case TransactionalRegionInterface.COMMIT_UNSUCESSFUL:
+ canCommit = false;
+ break;
+ default:
+ throw new CommitUnsuccessfulException(
+ "Unexpected return code from prepareCommit: " + commitStatus);
+ }
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Region ["
+ + location.getRegionInfo().getRegionNameAsString() + "] votes "
+ + (canCommit ? "to commit" : "to abort") + " transaction "
+ + transactionState.getTransactionId());
+ }
+
+ if (!canCommit) {
+ LOG.debug("Aborting [" + transactionState.getTransactionId() + "]");
+ abort(transactionState, location);
+ throw new CommitUnsuccessfulException();
+ }
+ }
+ } catch (Exception e) {
+ LOG.debug("Commit of transaction [" + transactionState.getTransactionId()
+ + "] was unsucsessful", e);
+ // This happens on a NSRE that is triggered by a split
+ // FIXME, but then abort fails
+ try {
+ abort(transactionState);
+ } catch (Exception abortException) {
+ LOG.warn("Exeption durring abort", abortException);
+ }
+ throw new CommitUnsuccessfulException(e);
+ }
+ return allReadOnly ? TransactionalRegionInterface.COMMIT_OK_READ_ONLY : TransactionalRegionInterface.COMMIT_OK;
+ }
+
+ /**
+ * Try and commit a transaction. This does both phases of the 2-phase protocol: prepare and commit.
+ *
+ * @param transactionState
+ * @throws IOException
+ * @throws CommitUnsuccessfulException
+ */
+ public void tryCommit(final TransactionState transactionState)
+ throws CommitUnsuccessfulException, IOException {
+ long startTime = System.currentTimeMillis();
+ LOG.debug("atempting to commit trasaction: " + transactionState.toString());
+ int status = prepareCommit(transactionState);
+
+ if (status == TransactionalRegionInterface.COMMIT_OK) {
+ doCommit(transactionState);
+ }
+ LOG.debug("Committed transaction ["+transactionState.getTransactionId()+"] in ["+((System.currentTimeMillis()-startTime))+"]ms");
+ }
+
+ /** Do the commit. This is the 2nd phase of the 2-phase protocol.
+ *
+ * @param transactionState
+ * @throws CommitUnsuccessfulException
+ */
+ public void doCommit(final TransactionState transactionState)
+ throws CommitUnsuccessfulException{
+ try {
+ LOG.debug("Commiting [" + transactionState.getTransactionId() + "]");
+
+ transactionLogger.setStatusForTransaction(transactionState
+ .getTransactionId(), TransactionLogger.TransactionStatus.COMMITTED);
+
+ for (HRegionLocation location : transactionState
+ .getParticipatingRegions()) {
+ TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
+ .getHRegionConnection(location.getServerAddress());
+ transactionalRegionServer.commit(location.getRegionInfo()
+ .getRegionName(), transactionState.getTransactionId());
+ }
+ } catch (Exception e) {
+ LOG.debug("Commit of transaction [" + transactionState.getTransactionId()
+ + "] was unsucsessful", e);
+ // This happens on a NSRE that is triggered by a split
+ // FIXME, but then abort fails
+ try {
+ abort(transactionState);
+ } catch (Exception abortException) {
+ LOG.warn("Exeption durring abort", abortException);
+ }
+ throw new CommitUnsuccessfulException(e);
+ }
+ // TODO: Transaction log can be deleted now ...
+ }
+
+ /**
+ * Abort a s transaction.
+ *
+ * @param transactionState
+ * @throws IOException
+ */
+ public void abort(final TransactionState transactionState) throws IOException {
+ abort(transactionState, null);
+ }
+
+ private void abort(final TransactionState transactionState,
+ final HRegionLocation locationToIgnore) throws IOException {
+ transactionLogger.setStatusForTransaction(transactionState
+ .getTransactionId(), TransactionLogger.TransactionStatus.ABORTED);
+
+ for (HRegionLocation location : transactionState.getParticipatingRegions()) {
+ if (locationToIgnore != null && location.equals(locationToIgnore)) {
+ continue;
+ }
+ try {
+ TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
+ .getHRegionConnection(location.getServerAddress());
+
+ transactionalRegionServer.abort(location.getRegionInfo()
+ .getRegionName(), transactionState.getTransactionId());
+ } catch (UnknownTransactionException e) {
+ LOG
+ .debug("Got unknown transaciton exception durring abort. Transaction: ["
+ + transactionState.getTransactionId()
+ + "], region: ["
+ + location.getRegionInfo().getRegionNameAsString()
+ + "]. Ignoring.");
+ } catch (NotServingRegionException e) {
+ LOG
+ .debug("Got NSRE durring abort. Transaction: ["
+ + transactionState.getTransactionId() + "], region: ["
+ + location.getRegionInfo().getRegionNameAsString()
+ + "]. Ignoring.");
+ }
+ }
+ }
+
+ public synchronized JtaXAResource getXAResource() {
+ if (xAResource == null){
+ xAResource = new JtaXAResource(this);
+ }
+ return xAResource;
+ }
+}
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java
new file mode 100644
index 00000000000..027c20094b8
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java
@@ -0,0 +1,50 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+
+class TransactionScannerCallable extends ScannerCallable {
+
+ private TransactionState transactionState;
+
+ TransactionScannerCallable(final TransactionState transactionState,
+ final HConnection connection, final byte[] tableName,
+ final byte[] startRow, Scan scan) {
+ super(connection, tableName, startRow, scan);
+ this.transactionState = transactionState;
+ }
+
+ @Override
+ protected long openScanner() throws IOException {
+ if (transactionState.addRegion(location)) {
+ ((TransactionalRegionInterface) server).beginTransaction(transactionState
+ .getTransactionId(), location.getRegionInfo().getRegionName());
+ }
+ return ((TransactionalRegionInterface) server).openScanner(transactionState
+ .getTransactionId(), this.location.getRegionInfo().getRegionName(),
+ getScan());
+ }
+}
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
new file mode 100644
index 00000000000..8c2f9805953
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java
@@ -0,0 +1,78 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.HRegionLocation;
+
+/**
+ * Holds client-side transaction information. Client's use them as opaque
+ * objects passed around to transaction operations.
+ *
+ */
+public class TransactionState {
+ static final Log LOG = LogFactory.getLog(TransactionState.class);
+
+ private final long transactionId;
+
+ private Set participatingRegions = new HashSet();
+
+ TransactionState(final long transactionId) {
+ this.transactionId = transactionId;
+ }
+
+ boolean addRegion(final HRegionLocation hregion) {
+ boolean added = participatingRegions.add(hregion);
+
+ if (added) {
+ LOG.debug("Adding new hregion ["
+ + hregion.getRegionInfo().getRegionNameAsString()
+ + "] to transaction [" + transactionId + "]");
+ }
+
+ return added;
+ }
+
+ Set getParticipatingRegions() {
+ return participatingRegions;
+ }
+
+ /**
+ * Get the transactionId.
+ *
+ * @return Return the transactionId.
+ */
+ public long getTransactionId() {
+ return transactionId;
+ }
+
+ /**
+ * @see java.lang.Object#toString()
+ */
+ @Override
+ public String toString() {
+ return "id: " + transactionId + ", particpants: "
+ + participatingRegions.size();
+ }
+}
diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
new file mode 100644
index 00000000000..e18d4159c91
--- /dev/null
+++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java
@@ -0,0 +1,190 @@
+/**
+ * Copyright 2008 The Apache Software Foundation
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.client.transactional;
+
+import java.io.IOException;
+
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HConnection;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.ScannerCallable;
+import org.apache.hadoop.hbase.client.ServerCallable;
+import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
+import org.apache.hadoop.hbase.util.Bytes;
+
+/**
+ * Table with transactional support.
+ *
+ */
+public class TransactionalTable extends HTable {
+
+ /**
+ * @param conf
+ * @param tableName
+ * @throws IOException
+ */
+ public TransactionalTable(final HBaseConfiguration conf,
+ final String tableName) throws IOException {
+ this(conf, Bytes.toBytes(tableName));
+ }
+
+ /**
+ * @param conf
+ * @param tableName
+ * @throws IOException
+ */
+ public TransactionalTable(final HBaseConfiguration conf,
+ final byte[] tableName) throws IOException {
+ super(conf, tableName);
+ }
+
+ private static abstract class TransactionalServerCallable extends
+ ServerCallable {
+ protected TransactionState transactionState;
+
+ protected TransactionalRegionInterface getTransactionServer() {
+ return (TransactionalRegionInterface) server;
+ }
+
+ protected void recordServer() throws IOException {
+ if (transactionState.addRegion(location)) {
+ getTransactionServer().beginTransaction(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName());
+ }
+ }
+
+ /**
+ * @param connection
+ * @param tableName
+ * @param row
+ * @param transactionState
+ */
+ public TransactionalServerCallable(final HConnection connection,
+ final byte[] tableName, final byte[] row,
+ final TransactionState transactionState) {
+ super(connection, tableName, row);
+ this.transactionState = transactionState;
+ }
+
+ }
+
+ /**
+ * Method for getting data from a row
+ * @param get the Get to fetch
+ * @return the result
+ * @throws IOException
+ * @since 0.20.0
+ */
+ public Result get(final TransactionState transactionState, final Get get) throws IOException {
+ return super.getConnection().getRegionServerWithRetries(
+ new TransactionalServerCallable(super.getConnection(), super
+ .getTableName(), get.getRow(), transactionState) {
+ public Result call() throws IOException {
+ recordServer();
+ return getTransactionServer().get(
+ transactionState.getTransactionId(),
+ location.getRegionInfo().getRegionName(), get);
+ }
+ });
+ }
+
+
+ /**
+ *
+ * @param delete
+ * @throws IOException
+ * @since 0.20.0
+ */
+ public void delete(final TransactionState transactionState, final Delete delete)
+ throws IOException {
+ super.getConnection().getRegionServerWithRetries(
+ new TransactionalServerCallable