diff --git a/CHANGES.txt b/CHANGES.txt index 5abe83aa972..946f699c30a 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -49,6 +49,8 @@ Release 0.3.0 - Unreleased were present in previous versions of the patches for this issue, but not in the version that was committed. Also fix a number of compilation problems that were introduced by patch. + HBASE-669 MultiRegion transactions with Optimistic Concurrency Control + (Clint Morgan via Stack) OPTIMIZATIONS diff --git a/src/java/org/apache/hadoop/hbase/HMerge.java b/src/java/org/apache/hadoop/hbase/HMerge.java index f87b6c7d577..2e0cc8bd1e1 100644 --- a/src/java/org/apache/hadoop/hbase/HMerge.java +++ b/src/java/org/apache/hadoop/hbase/HMerge.java @@ -150,12 +150,13 @@ class HMerge implements HConstants { for (int i = 0; i < info.length - 1; i++) { if (currentRegion == null) { currentRegion = - new HRegion(tabledir, hlog, fs, conf, info[i], null, null); + new HRegion(tabledir, hlog, fs, conf, info[i], null); + currentRegion.initialize(null, null); currentSize = currentRegion.getLargestHStoreSize(); } nextRegion = - new HRegion(tabledir, hlog, fs, conf, info[i + 1], null, null); - + new HRegion(tabledir, hlog, fs, conf, info[i + 1], null); + nextRegion.initialize(null, null); nextSize = nextRegion.getLargestHStoreSize(); if ((currentSize + nextSize) <= (maxFilesize / 2)) { @@ -322,7 +323,8 @@ class HMerge implements HConstants { // Scan root region to find all the meta regions root = new HRegion(rootTableDir, hlog, fs, conf, - HRegionInfo.ROOT_REGIONINFO, null, null); + HRegionInfo.ROOT_REGIONINFO, null); + root.initialize(null, null); InternalScanner rootScanner = root.getScanner(COL_REGIONINFO_ARRAY, HConstants.EMPTY_START_ROW, diff --git a/src/java/org/apache/hadoop/hbase/client/HTable.java b/src/java/org/apache/hadoop/hbase/client/HTable.java index 9771e0db524..acc930bfbcf 100644 --- a/src/java/org/apache/hadoop/hbase/client/HTable.java +++ b/src/java/org/apache/hadoop/hbase/client/HTable.java @@ -982,7 +982,9 @@ public class HTable { public Scanner getScanner(final byte [][] columns, final byte [] startRow, long timestamp, RowFilterInterface filter) throws IOException { - return new ClientScanner(columns, startRow, timestamp, filter); + ClientScanner s = new ClientScanner(columns, startRow, timestamp, filter); + s.initialize(); + return s; } /** @@ -1335,15 +1337,13 @@ public class HTable { protected RowFilterInterface filter; protected ClientScanner(final Text [] columns, final Text startRow, - long timestamp, RowFilterInterface filter) - throws IOException { + long timestamp, RowFilterInterface filter) { this(Bytes.toByteArrays(columns), startRow.getBytes(), timestamp, filter); } protected ClientScanner(final byte[][] columns, final byte [] startRow, - final long timestamp, final RowFilterInterface filter) - throws IOException { + final long timestamp, final RowFilterInterface filter) { if (CLIENT_LOG.isDebugEnabled()) { CLIENT_LOG.debug("Creating scanner over " + Bytes.toString(getTableName()) + " starting at key '" + Bytes.toString(startRow) + "'"); @@ -1359,6 +1359,9 @@ public class HTable { if (filter != null) { filter.validate(columns); } + } + + public void initialize() throws IOException { nextScanner(); } diff --git a/src/java/org/apache/hadoop/hbase/client/ServerCallable.java b/src/java/org/apache/hadoop/hbase/client/ServerCallable.java index 74b23e9fcdd..0172ff716d6 100644 --- a/src/java/org/apache/hadoop/hbase/client/ServerCallable.java +++ b/src/java/org/apache/hadoop/hbase/client/ServerCallable.java @@ -60,11 +60,17 @@ public abstract class ServerCallable implements Callable { /** @return the server name */ public String getServerName() { + if (location == null) { + return null; + } return location.getServerAddress().toString(); } /** @return the region name */ - public byte [] getRegionName() { + public byte[] getRegionName() { + if (location == null) { + return null; + } return location.getRegionInfo().getRegionName(); } diff --git a/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java b/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java new file mode 100644 index 00000000000..6f305f102ab --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/transactional/CommitUnsuccessfulException.java @@ -0,0 +1,43 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.transactional; + +/** Thrown when a transaction cannot be committed. + * + */ +public class CommitUnsuccessfulException extends Exception { + + public CommitUnsuccessfulException() { + super(); + } + + public CommitUnsuccessfulException(String arg0, Throwable arg1) { + super(arg0, arg1); + } + + public CommitUnsuccessfulException(String arg0) { + super(arg0); + } + + public CommitUnsuccessfulException(Throwable arg0) { + super(arg0); + } + +} diff --git a/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java b/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java new file mode 100644 index 00000000000..4f4ce096beb --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/transactional/LocalTransactionLogger.java @@ -0,0 +1,66 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.transactional; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Random; + +/** + * A local, in-memory implementation of the transaction logger. Does not provide a global view, so + * it can't be relighed on by + * + */ +public class LocalTransactionLogger implements TransactionLogger { + + private static LocalTransactionLogger instance; + + public synchronized static LocalTransactionLogger getInstance() { + if (instance == null) { + instance = new LocalTransactionLogger(); + } + return instance; + } + + private Random random = new Random(); + private Map transactionIdToStatusMap = Collections + .synchronizedMap(new HashMap()); + + private LocalTransactionLogger() { + // Enforce singlton + } + + // Gives back random longs to minimize possibility of collision + public long createNewTransactionLog() { + long id = random.nextLong(); + transactionIdToStatusMap.put(id, TransactionStatus.PENDING); + return id; + } + + public TransactionStatus getStatusForTransaction(final long transactionId) { + return transactionIdToStatusMap.get(transactionId); + } + + public void setStatusForTransaction(final long transactionId, + final TransactionStatus status) { + transactionIdToStatusMap.put(transactionId, status); + } +} diff --git a/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java b/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java new file mode 100644 index 00000000000..f815c200d7c --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java @@ -0,0 +1,45 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.transactional; + +/** + * Simple interface used to provide a log about transaction status. Written to + * by the client, and read by regionservers in case of failure. + * + */ +public interface TransactionLogger { + + enum TransactionStatus { + PENDING, COMMITTED, ABORTED + } + + /** + * Create a new transaction log. Return the transaction's globally unique id. + * Log's initial value should be PENDING + * + * @return transaction id + */ + long createNewTransactionLog(); + + TransactionStatus getStatusForTransaction(long transactionId); + + void setStatusForTransaction(long transactionId, TransactionStatus status); + +} diff --git a/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java b/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java new file mode 100644 index 00000000000..6237ecccc96 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java @@ -0,0 +1,145 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.transactional; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionLocation; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HConnectionManager; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; +import org.apache.hadoop.ipc.RemoteException; + +/** + * Transaction Manager. Responsible for committing transactions. + * + */ +public class TransactionManager { + static final Log LOG = LogFactory.getLog(TransactionManager.class); + + private final HConnection connection; + private final TransactionLogger transactionLogger; + + public TransactionManager(final HBaseConfiguration conf) { + this(LocalTransactionLogger.getInstance(), conf); + } + + public TransactionManager(final TransactionLogger transactionLogger, + final HBaseConfiguration conf) { + this.transactionLogger = transactionLogger; + connection = HConnectionManager.getConnection(conf); + } + + /** + * Called to start a transaction. + * + * @return new transaction state + */ + public TransactionState beginTransaction() { + long transactionId = transactionLogger.createNewTransactionLog(); + LOG.debug("Begining transaction " + transactionId); + return new TransactionState(transactionId); + } + + /** + * Try and commit a transaction. + * + * @param transactionState + * @return + * @throws IOException + */ + public void tryCommit(final TransactionState transactionState) + throws CommitUnsuccessfulException, IOException { + LOG.debug("atempting to commit trasaction: " + transactionState.toString()); + + try { + for (HRegionLocation location : transactionState + .getParticipatingRegions()) { + TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection + .getHRegionConnection(location.getServerAddress()); + boolean canCommit = transactionalRegionServer.commitRequest(location + .getRegionInfo().getRegionName(), transactionState + .getTransactionId()); + if (LOG.isTraceEnabled()) { + LOG.trace("Region [" + + location.getRegionInfo().getRegionNameAsString() + "] votes " + + (canCommit ? "to commit" : "to abort") + " transaction " + + transactionState.getTransactionId()); + } + + if (!canCommit) { + LOG.debug("Aborting [" + transactionState.getTransactionId() + "]"); + abort(transactionState, location); + throw new CommitUnsuccessfulException(); + } + } + + LOG.debug("Commiting [" + transactionState.getTransactionId() + "]"); + + transactionLogger.setStatusForTransaction(transactionState + .getTransactionId(), TransactionLogger.TransactionStatus.COMMITTED); + + for (HRegionLocation location : transactionState + .getParticipatingRegions()) { + TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection + .getHRegionConnection(location.getServerAddress()); + transactionalRegionServer.commit(location.getRegionInfo() + .getRegionName(), transactionState.getTransactionId()); + } + } catch (RemoteException e) { + LOG.debug("Commit of transaction [" + transactionState.getTransactionId() + + "] was unsucsessful", e); + // FIXME, think about the what ifs + throw new CommitUnsuccessfulException(e); + } + // Tran log can be deleted now ... + } + + /** + * Abort a s transaction. + * + * @param transactionState + * @throws IOException + */ + public void abort(final TransactionState transactionState) throws IOException { + abort(transactionState, null); + } + + private void abort(final TransactionState transactionState, + final HRegionLocation locationToIgnore) throws IOException { + transactionLogger.setStatusForTransaction(transactionState + .getTransactionId(), TransactionLogger.TransactionStatus.ABORTED); + + for (HRegionLocation location : transactionState.getParticipatingRegions()) { + if (locationToIgnore != null && location.equals(locationToIgnore)) { + continue; + } + + TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection + .getHRegionConnection(location.getServerAddress()); + + transactionalRegionServer.abort(location.getRegionInfo().getRegionName(), + transactionState.getTransactionId()); + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java b/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java new file mode 100644 index 00000000000..081068f3098 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/transactional/TransactionScannerCallable.java @@ -0,0 +1,51 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.transactional; + +import java.io.IOException; + +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; + +class TransactionScannerCallable extends ScannerCallable { + + private TransactionState transactionState; + + TransactionScannerCallable(final TransactionState transactionState, + final HConnection connection, final byte[] tableName, + final byte[][] columns, final byte[] startRow, final long timestamp, + final RowFilterInterface filter) { + super(connection, tableName, columns, startRow, timestamp, filter); + this.transactionState = transactionState; + } + + @Override + protected long openScanner() throws IOException { + if (transactionState.addRegion(location)) { + ((TransactionalRegionInterface) server).beginTransaction(transactionState + .getTransactionId(), location.getRegionInfo().getRegionName()); + } + return ((TransactionalRegionInterface) server).openScanner(transactionState + .getTransactionId(), this.location.getRegionInfo().getRegionName(), + getColumns(), row, getTimestamp(), getFilter()); + } +} diff --git a/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java b/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java new file mode 100644 index 00000000000..fe3c7edc081 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/transactional/TransactionState.java @@ -0,0 +1,75 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.transactional; + +import java.util.HashSet; +import java.util.Set; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HRegionLocation; + +/** + * Holds client-side transaction information. Client's use them as opaque + * objects passed around to transaction operations. + * + */ +public class TransactionState { + static final Log LOG = LogFactory.getLog(TransactionState.class); + + private final long transactionId; + + private Set participatingRegions = new HashSet(); + + TransactionState(final long transactionId) { + this.transactionId = transactionId; + } + + boolean addRegion(final HRegionLocation hregion) { + boolean added = participatingRegions.add(hregion); + + if (added) { + LOG.debug("Adding new hregion [" + + hregion.getRegionInfo().getRegionNameAsString() + + "] to transaction [" + transactionId + "]"); + } + + return added; + } + + Set getParticipatingRegions() { + return participatingRegions; + } + + /** + * Get the transactionId. + * + * @return Return the transactionId. + */ + public long getTransactionId() { + return transactionId; + } + + @Override + public String toString() { + return "id: " + transactionId + ", particpants: " + + participatingRegions.size(); + } +} diff --git a/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java b/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java new file mode 100644 index 00000000000..edbb4782a9c --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/transactional/TransactionalTable.java @@ -0,0 +1,401 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.transactional; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.HConnection; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scanner; +import org.apache.hadoop.hbase.client.ScannerCallable; +import org.apache.hadoop.hbase.client.ServerCallable; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; +import org.apache.hadoop.io.Text; + +/** + * Table with transactional support. + * + */ +public class TransactionalTable extends HTable { + + public TransactionalTable(final HBaseConfiguration conf, + final String tableName) throws IOException { + super(conf, tableName); + } + + public TransactionalTable(final HBaseConfiguration conf, final Text tableName) + throws IOException { + super(conf, tableName); + } + + public TransactionalTable(final HBaseConfiguration conf, + final byte[] tableName) throws IOException { + super(conf, tableName); + } + + private static abstract class TransactionalServerCallable extends + ServerCallable { + protected TransactionState transactionState; + + protected TransactionalRegionInterface getTransactionServer() { + return (TransactionalRegionInterface) server; + } + + protected void recordServer() throws IOException { + if (transactionState.addRegion(location)) { + getTransactionServer().beginTransaction( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName()); + } + } + + public TransactionalServerCallable(final HConnection connection, + final byte[] tableName, final byte[] row, + final TransactionState transactionState) { + super(connection, tableName, row); + this.transactionState = transactionState; + } + + } + + /** + * Get a single value for the specified row and column + * + * @param row row key + * @param column column name + * @return value for specified row/column + * @throws IOException + */ + public Cell get(final TransactionState transactionState, final byte[] row, + final byte[] column) throws IOException { + return super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public Cell call() throws IOException { + recordServer(); + return getTransactionServer().get( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, column); + } + }); + } + + /** + * Get the specified number of versions of the specified row and column + * + * @param row - row key + * @param column - column name + * @param numVersions - number of versions to retrieve + * @return - array byte values + * @throws IOException + */ + public Cell[] get(final TransactionState transactionState, final byte[] row, + final byte[] column, final int numVersions) throws IOException { + Cell[] values = null; + values = super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public Cell[] call() throws IOException { + recordServer(); + return getTransactionServer().get( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, column, + numVersions); + } + }); + + return values; + } + + /** + * Get the specified number of versions of the specified row and column with + * the specified timestamp. + * + * @param row - row key + * @param column - column name + * @param timestamp - timestamp + * @param numVersions - number of versions to retrieve + * @return - array of values that match the above criteria + * @throws IOException + */ + public Cell[] get(final TransactionState transactionState, final byte[] row, + final byte[] column, final long timestamp, final int numVersions) + throws IOException { + Cell[] values = null; + values = super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public Cell[] call() throws IOException { + recordServer(); + return getTransactionServer().get( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, column, + timestamp, numVersions); + } + }); + + return values; + } + + /** + * Get all the data for the specified row at the latest timestamp + * + * @param row row key + * @return RowResult is empty if row does not exist. + * @throws IOException + */ + public RowResult getRow(final TransactionState transactionState, + final byte[] row) throws IOException { + return getRow(transactionState, row, HConstants.LATEST_TIMESTAMP); + } + + /** + * Get all the data for the specified row at a specified timestamp + * + * @param row row key + * @param ts timestamp + * @return RowResult is empty if row does not exist. + * @throws IOException + */ + public RowResult getRow(final TransactionState transactionState, + final byte[] row, final long ts) throws IOException { + return super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public RowResult call() throws IOException { + recordServer(); + return getTransactionServer().getRow( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, ts); + } + }); + } + + /** + * Get selected columns for the specified row at the latest timestamp + * + * @param row row key + * @param columns Array of column names you want to retrieve. + * @return RowResult is empty if row does not exist. + * @throws IOException + */ + public RowResult getRow(final TransactionState transactionState, + final byte[] row, final byte[][] columns) throws IOException { + return getRow(transactionState, row, columns, HConstants.LATEST_TIMESTAMP); + } + + /** + * Get selected columns for the specified row at a specified timestamp + * + * @param row row key + * @param columns Array of column names you want to retrieve. + * @param ts timestamp + * @return RowResult is empty if row does not exist. + * @throws IOException + */ + public RowResult getRow(final TransactionState transactionState, + final byte[] row, final byte[][] columns, final long ts) + throws IOException { + return super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public RowResult call() throws IOException { + recordServer(); + return getTransactionServer().getRow( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, columns, ts); + } + }); + } + + /** + * Delete all cells that match the passed row and whose timestamp is equal-to + * or older than the passed timestamp. + * + * @param row Row to update + * @param column name of column whose value is to be deleted + * @param ts Delete all cells of the same timestamp or older. + * @throws IOException + */ + public void deleteAll(final TransactionState transactionState, + final byte[] row, final long ts) throws IOException { + super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), row, transactionState) { + public Boolean call() throws IOException { + recordServer(); + getTransactionServer().deleteAll( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), row, ts); + return null; + } + }); + } + + /** + * Get a scanner on the current table starting at first row. Return the + * specified columns. + * + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex in the column qualifier. A column qualifier is judged to be a + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @return scanner + * @throws IOException + */ + public Scanner getScanner(final TransactionState transactionState, + final byte[][] columns) throws IOException { + return getScanner(transactionState, columns, HConstants.EMPTY_START_ROW, + HConstants.LATEST_TIMESTAMP, null); + } + + /** + * Get a scanner on the current table starting at the specified row. Return + * the specified columns. + * + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex in the column qualifier. A column qualifier is judged to be a + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row in table to scan + * @return scanner + * @throws IOException + */ + public Scanner getScanner(final TransactionState transactionState, + final byte[][] columns, final byte[] startRow) throws IOException { + return getScanner(transactionState, columns, startRow, + HConstants.LATEST_TIMESTAMP, null); + } + + /** + * Get a scanner on the current table starting at the specified row. Return + * the specified columns. + * + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex in the column qualifier. A column qualifier is judged to be a + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row in table to scan + * @param timestamp only return results whose timestamp <= this value + * @return scanner + * @throws IOException + */ + public Scanner getScanner(final TransactionState transactionState, + final byte[][] columns, final byte[] startRow, final long timestamp) + throws IOException { + return getScanner(transactionState, columns, startRow, timestamp, null); + } + + /** + * Get a scanner on the current table starting at the specified row. Return + * the specified columns. + * + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex in the column qualifier. A column qualifier is judged to be a + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row in table to scan + * @param filter a row filter using row-key regexp and/or column data filter. + * @return scanner + * @throws IOException + */ + public Scanner getScanner(final TransactionState transactionState, + final byte[][] columns, final byte[] startRow, + final RowFilterInterface filter) throws IOException { + return getScanner(transactionState, columns, startRow, + HConstants.LATEST_TIMESTAMP, filter); + } + + /** + * Get a scanner on the current table starting at the specified row. Return + * the specified columns. + * + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex in the column qualifier. A column qualifier is judged to be a + * regex if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row in table to scan + * @param timestamp only return results whose timestamp <= this value + * @param filter a row filter using row-key regexp and/or column data filter. + * @return scanner + * @throws IOException + */ + public Scanner getScanner(final TransactionState transactionState, + final byte[][] columns, final byte[] startRow, final long timestamp, + final RowFilterInterface filter) throws IOException { + ClientScanner scanner = new TransactionalClientScanner(transactionState, columns, startRow, + timestamp, filter); + scanner.initialize(); + return scanner; + } + + /** + * Commit a BatchUpdate to the table. + * + * @param batchUpdate + * @throws IOException + */ + public synchronized void commit(final TransactionState transactionState, + final BatchUpdate batchUpdate) throws IOException { + super.getConnection().getRegionServerWithRetries( + new TransactionalServerCallable(super.getConnection(), super + .getTableName(), batchUpdate.getRow(), transactionState) { + public Boolean call() throws IOException { + recordServer(); + getTransactionServer().batchUpdate( + transactionState.getTransactionId(), + location.getRegionInfo().getRegionName(), batchUpdate); + return null; + } + }); + } + + protected class TransactionalClientScanner extends HTable.ClientScanner { + + private TransactionState transactionState; + + protected TransactionalClientScanner( + final TransactionState transactionState, final byte[][] columns, + final byte[] startRow, final long timestamp, + final RowFilterInterface filter) { + super(columns, startRow, timestamp, filter); + this.transactionState = transactionState; + } + + @Override + protected ScannerCallable getScannerCallable(final byte[] localStartKey) { + return new TransactionScannerCallable(transactionState, getConnection(), + getTableName(), getColumns(), localStartKey, getTimestamp(), + getFilter()); + } + } + +} diff --git a/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java b/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java new file mode 100644 index 00000000000..f46426cd2e9 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/transactional/UnknownTransactionException.java @@ -0,0 +1,41 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.transactional; + +import org.apache.hadoop.hbase.DoNotRetryIOException; + +/** + * Thrown if a region server is passed an unknown transaction id + */ + public class UnknownTransactionException extends DoNotRetryIOException { + + /** constructor */ + public UnknownTransactionException() { + super(); + } + + /** + * Constructor + * @param s message + */ + public UnknownTransactionException(String s) { + super(s); + } +} diff --git a/src/java/org/apache/hadoop/hbase/client/transactional/package.html b/src/java/org/apache/hadoop/hbase/client/transactional/package.html new file mode 100644 index 00000000000..0dd667a7431 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/client/transactional/package.html @@ -0,0 +1,59 @@ + + + + + + + + +This package provides support for atomic transactions. Transactions can +span multiple regions. Transaction writes are applied when committing a +transaction. At commit time, the transaction is examined to see if it +can be applied while still maintaining atomicity. This is done by +looking for conflicts with the transactions that committed while the +current transaction was running. This technique is known as optimistic +concurrency control (OCC) because it relies on the assumption that +transactions will mostly not have conflicts with each other. + +

+For more details on OCC, see the paper On Optimistic Methods for Concurrency Control +by Kung and Robinson available + here . + +

To enable transactions, modify hbase-site.xml to turn on the +TransactionalRegionServer. This is done by setting +hbase.regionserver.class to +org.apache.hadoop.hbase.ipc.TransactionalRegionInterface and +hbase.regionserver.impl to +org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer + +

Known Issues

+ +Recovery in the face of hregion server failure +is not fully implemented. Thus, you cannot rely on the transactional +properties in the face of node failure. + +

In order to avoid phantom reads on scanners, scanners currently +claim a write set for all rows in every regions which they scan +through. This means that if transaction A writes to a region that +transaction B is scanning, then there is a conflict (only one +transacton can be committed). This will occur even if the scanner +never went over the row that was written. + + + diff --git a/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java b/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java new file mode 100644 index 00000000000..28199a6a265 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/ipc/TransactionalRegionInterface.java @@ -0,0 +1,178 @@ +/* + * $Id$ + * Created on Jun 4, 2008 + * + */ +package org.apache.hadoop.hbase.ipc; + +import java.io.IOException; + +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.RowResult; + +/** + * Interface for transactional region servers. + * + */ +public interface TransactionalRegionInterface extends HRegionInterface { + /** Interface version number */ + public static final long versionID = 1L; + + /** + * Sent to initiate a transaction. + * + * @param transactionId + * @param regionName name of region + */ + public void beginTransaction(long transactionId, final byte[] regionName) + throws IOException; + + /** + * Retrieve a single value from the specified region for the specified row and + * column keys + * + * @param regionName name of region + * @param row row key + * @param column column key + * @return alue for that region/row/column + * @throws IOException + */ + public Cell get(long transactionId, final byte[] regionName, + final byte[] row, final byte[] column) throws IOException; + + /** + * Get the specified number of versions of the specified row and column + * + * @param regionName region name + * @param row row key + * @param column column key + * @param numVersions number of versions to return + * @return array of values + * @throws IOException + */ + public Cell[] get(long transactionId, final byte[] regionName, + final byte[] row, final byte[] column, final int numVersions) + throws IOException; + + /** + * Get the specified number of versions of the specified row and column with + * the specified timestamp. + * + * @param regionName region name + * @param row row key + * @param column column key + * @param timestamp timestamp + * @param numVersions number of versions to return + * @return array of values + * @throws IOException + */ + public Cell[] get(long transactionId, final byte[] regionName, + final byte[] row, final byte[] column, final long timestamp, + final int numVersions) throws IOException; + + /** + * Get all the data for the specified row at a given timestamp + * + * @param regionName region name + * @param row row key + * @return map of values + * @throws IOException + */ + public RowResult getRow(long transactionId, final byte[] regionName, + final byte[] row, final long ts) throws IOException; + + /** + * Get selected columns for the specified row at a given timestamp. + * + * @param regionName region name + * @param row row key + * @return map of values + * @throws IOException + */ + public RowResult getRow(long transactionId, final byte[] regionName, + final byte[] row, final byte[][] columns, final long ts) + throws IOException; + + /** + * Get selected columns for the specified row at the latest timestamp. + * + * @param regionName region name + * @param row row key + * @return map of values + * @throws IOException + */ + public RowResult getRow(long transactionId, final byte[] regionName, + final byte[] row, final byte[][] columns) throws IOException; + + /** + * Delete all cells that match the passed row and whose timestamp is equal-to + * or older than the passed timestamp. + * + * @param regionName region name + * @param row row key + * @param timestamp Delete all entries that have this timestamp or older + * @throws IOException + */ + public void deleteAll(long transactionId, byte[] regionName, byte[] row, + long timestamp) throws IOException; + + /** + * Opens a remote scanner with a RowFilter. + * + * @param transactionId + * @param regionName name of region to scan + * @param columns columns to scan. If column name is a column family, all + * columns of the specified column family are returned. Its also possible to + * pass a regex for column family name. A column name is judged to be regex if + * it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param startRow starting row to scan + * @param timestamp only return values whose timestamp is <= this value + * @param filter RowFilter for filtering results at the row-level. + * + * @return scannerId scanner identifier used in other calls + * @throws IOException + */ + public long openScanner(final long transactionId, final byte[] regionName, + final byte[][] columns, final byte[] startRow, long timestamp, + RowFilterInterface filter) throws IOException; + + /** + * Applies a batch of updates via one RPC + * + * @param regionName name of the region to update + * @param b BatchUpdate + * @throws IOException + */ + public void batchUpdate(long transactionId, final byte[] regionName, + final BatchUpdate b) throws IOException; + + /** + * Ask if we can commit the given transaction. + * + * @param transactionId + * @return true if we can commit + */ + public boolean commitRequest(final byte[] regionName, long transactionId) + throws IOException; + + /** + * Commit the transaction. + * + * @param transactionId + * @return + */ + public void commit(final byte[] regionName, long transactionId) + throws IOException; + + /** + * Abort the transaction. + * + * @param transactionId + * @return + */ + public void abort(final byte[] regionName, long transactionId) + throws IOException; +} diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HLog.java b/src/java/org/apache/hadoop/hbase/regionserver/HLog.java index ae2c92d44fc..e7c2c3c8393 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HLog.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HLog.java @@ -171,7 +171,7 @@ public class HLog implements HConstants { * Accessor for tests. * @return Current state of the monotonically increasing file id. */ - long getFilenum() { + public long getFilenum() { return this.filenum; } @@ -204,6 +204,10 @@ public class HLog implements HConstants { } } } + + public long getSequenceNumber() { + return logSeqNum; + } /** * Roll the log writer. That is, start writing log messages to a new file. @@ -311,7 +315,7 @@ public class HLog implements HConstants { * This is a convenience method that computes a new filename with a given * file-number. */ - Path computeFilename(final long fn) { + public Path computeFilename(final long fn) { return new Path(dir, HLOG_DATFILE + fn); } @@ -330,7 +334,7 @@ public class HLog implements HConstants { * * @throws IOException */ - void close() throws IOException { + public void close() throws IOException { cacheFlushLock.lock(); try { synchronized (updateLock) { @@ -391,13 +395,8 @@ public class HLog implements HConstants { new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]); HLogEdit logEdit = new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp()); - try { - this.writer.append(logKey, logEdit); - } catch (IOException e) { - LOG.fatal("Could not append. Requesting close of log", e); - requestLogRoll(); - throw e; - } + doWrite(logKey, logEdit); + this.numEntries++; } } @@ -411,6 +410,63 @@ public class HLog implements HConstants { this.listener.logRollRequested(); } } + + private void doWrite(HLogKey logKey, HLogEdit logEdit) throws IOException { + try { + this.writer.append(logKey, logEdit); + } catch (IOException e) { + LOG.fatal("Could not append. Requesting close of log", e); + requestLogRoll(); + throw e; + } + } + + /** Append an entry without a row to the log. + * + * @param regionInfo + * @param logEdit + * @throws IOException + */ + public void append(HRegionInfo regionInfo, HLogEdit logEdit) throws IOException { + this.append(regionInfo, new byte[0], logEdit); + } + + /** Append an entry to the log. + * + * @param regionName + * @param tableName + * @param row + * @param logEdit + * @throws IOException + */ + public void append(HRegionInfo regionInfo, byte [] row, HLogEdit logEdit) throws IOException { + if (closed) { + throw new IOException("Cannot append; log is closed"); + } + byte [] regionName = regionInfo.getRegionName(); + byte [] tableName = regionInfo.getTableDesc().getName(); + + synchronized (updateLock) { + long seqNum = obtainSeqNum(); + // The 'lastSeqWritten' map holds the sequence number of the oldest + // write for each region. When the cache is flushed, the entry for the + // region being flushed is removed if the sequence number of the flush + // is greater than or equal to the value in lastSeqWritten. + if (!this.lastSeqWritten.containsKey(regionName)) { + this.lastSeqWritten.put(regionName, Long.valueOf(seqNum)); + } + + HLogKey logKey = new HLogKey(regionName, tableName, row, seqNum); + doWrite(logKey, logEdit); + this.numEntries++; + } + + if (this.numEntries > this.maxlogentries) { + if (listener != null) { + listener.logRollRequested(); + } + } + } /** @return How many items have been added to the log */ int getNumEntries() { @@ -508,6 +564,10 @@ public class HLog implements HConstants { this.cacheFlushLock.unlock(); } + public static boolean isMetaColumn(byte [] column) { + return Bytes.equals(METACOLUMN, column); + } + /** * Split up a bunch of log files, that are no longer being written to, into * new files, one per region. Delete the old log files when finished. diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java b/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java index c85ea33ba4d..0349f480832 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HLogEdit.java @@ -19,6 +19,7 @@ */ package org.apache.hadoop.hbase.regionserver; +import org.apache.hadoop.hbase.io.BatchOperation; import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.*; @@ -62,11 +63,20 @@ public class HLogEdit implements Writable, HConstants { public static boolean isDeleted(final byte [] value) { return (value == null)? false: deleteBytes.compareTo(value) == 0; } + + public enum TransactionalOperation { + START, WRITE, COMMIT, ABORT + } private byte [] column; private byte [] val; private long timestamp; private static final int MAX_VALUE_LEN = 128; + + private boolean isTransactionEntry; + private Long transactionId = null; + private TransactionalOperation operation; + /** * Default constructor used by Writable @@ -85,6 +95,34 @@ public class HLogEdit implements Writable, HConstants { this.column = c; this.val = bval; this.timestamp = timestamp; + this.isTransactionEntry = false; + } + + /** Construct a WRITE transaction. + * + * @param transactionId + * @param op + * @param timestamp + */ + public HLogEdit(long transactionId, BatchOperation op, long timestamp) { + this(op.getColumn(), op.getValue(), timestamp); + // This covers delete ops too... + this.transactionId = transactionId; + this.operation = TransactionalOperation.WRITE; + this.isTransactionEntry = true; + } + + /** Construct a transactional operation (BEGIN, ABORT, or COMMIT). + * + * @param transactionId + * @param op + */ + public HLogEdit(long transactionId, TransactionalOperation op) { + this.column = new byte[0]; + this.val = new byte[0]; + this.transactionId = transactionId; + this.operation = op; + this.isTransactionEntry = true; } /** @return the column */ @@ -101,6 +139,28 @@ public class HLogEdit implements Writable, HConstants { public long getTimestamp() { return this.timestamp; } + + public boolean isTransactionEntry() { + return isTransactionEntry; + } + + /** + * Get the transactionId, or null if this is not a transactional edit. + * + * @return Return the transactionId. + */ + public Long getTransactionId() { + return transactionId; + } + + /** + * Get the operation. + * + * @return Return the operation. + */ + public TransactionalOperation getOperation() { + return operation; + } /** * @return First column name, timestamp, and first 128 bytes of the value @@ -117,8 +177,13 @@ public class HLogEdit implements Writable, HConstants { } catch (UnsupportedEncodingException e) { throw new RuntimeException("UTF8 encoding not present?", e); } - return "(" + Bytes.toString(getColumn()) + "/" + getTimestamp() + "/" + - value + ")"; + return "(" + + Bytes.toString(getColumn()) + + "/" + + getTimestamp() + + "/" + + (isTransactionEntry ? "tran: " + transactionId + " op " + + operation.toString() +"/": "") + value + ")"; } // Writable @@ -126,9 +191,18 @@ public class HLogEdit implements Writable, HConstants { /** {@inheritDoc} */ public void write(DataOutput out) throws IOException { Bytes.writeByteArray(out, this.column); - out.writeInt(this.val.length); - out.write(this.val); + if (this.val == null) { + out.writeInt(0); + } else { + out.writeInt(this.val.length); + out.write(this.val); + } out.writeLong(timestamp); + out.writeBoolean(isTransactionEntry); + if (isTransactionEntry) { + out.writeLong(transactionId); + out.writeUTF(operation.name()); + } } /** {@inheritDoc} */ @@ -137,5 +211,10 @@ public class HLogEdit implements Writable, HConstants { this.val = new byte[in.readInt()]; in.readFully(this.val); this.timestamp = in.readLong(); + isTransactionEntry = in.readBoolean(); + if (isTransactionEntry) { + transactionId = in.readLong(); + operation = TransactionalOperation.valueOf(in.readUTF()); + } } } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java b/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java index f77aeb4f798..7e9d15e3f44 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HLogKey.java @@ -30,6 +30,8 @@ import java.io.*; * The log intermingles edits to many tables and rows, so each log entry * identifies the appropriate table and row. Within a table and row, they're * also sorted. + * + * Some Transactional edits (START, COMMIT, ABORT) will not have an associated row. */ public class HLogKey implements WritableComparable { private byte [] regionName; @@ -64,19 +66,19 @@ public class HLogKey implements WritableComparable { // A bunch of accessors ////////////////////////////////////////////////////////////////////////////// - byte [] getRegionName() { + public byte [] getRegionName() { return regionName; } - byte [] getTablename() { + public byte [] getTablename() { return tablename; } - byte [] getRow() { + public byte [] getRow() { return row; } - long getLogSeqNum() { + public long getLogSeqNum() { return logSeqNum; } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 3d224d00d2c..927f3830456 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -243,8 +243,8 @@ public class HRegion implements HConstants { LOG.debug("Files for new region"); listPaths(fs, newRegionDir); } - HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo, - null, null); + HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo, null); + dstRegion.initialize(null, null); dstRegion.compactStores(); if (LOG.isDebugEnabled()) { LOG.debug("Files for new region"); @@ -318,7 +318,7 @@ public class HRegion implements HConstants { private final Map> targetColumns = new ConcurrentHashMap>(); // Default access because read by tests. - final Map stores = new ConcurrentHashMap(); + protected final Map stores = new ConcurrentHashMap(); final AtomicLong memcacheSize = new AtomicLong(0); final Path basedir; @@ -376,7 +376,7 @@ public class HRegion implements HConstants { private final ReentrantReadWriteLock updatesLock = new ReentrantReadWriteLock(); private final Integer splitLock = new Integer(0); - private final long minSequenceId; + private long minSequenceId; final AtomicInteger activeScannerCount = new AtomicInteger(0); ////////////////////////////////////////////////////////////////////////////// @@ -395,31 +395,6 @@ public class HRegion implements HConstants { * appropriate log info for this HRegion. If there is a previous log file * (implying that the HRegion has been written-to before), then read it from * the supplied path. - * @param fs is the filesystem. - * @param conf is global configuration settings. - * @param regionInfo - HRegionInfo that describes the region - * @param initialFiles If there are initial files (implying that the HRegion - * is new), then read them from the supplied path. - * @param flushListener an object that implements CacheFlushListener or null - * or null - * @throws IOException - */ - public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, - HRegionInfo regionInfo, Path initialFiles, - FlushRequester flushListener) throws IOException { - this(basedir, log, fs, conf, regionInfo, initialFiles, flushListener, null); - } - - /** - * HRegion constructor. - * - * @param log The HLog is the outbound log for any updates to the HRegion - * (There's a single HLog for all the HRegions on a single HRegionServer.) - * The log file is a logfile from the previous execution that's - * custom-computed for this HRegion. The HRegionServer computes and sorts the - * appropriate log info for this HRegion. If there is a previous log file - * (implying that the HRegion has been written-to before), then read it from - * the supplied path. * @param basedir qualified path of directory where region should be located, * usually the table directory. * @param fs is the filesystem. @@ -434,10 +409,8 @@ public class HRegion implements HConstants { * @throws IOException */ public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf, - HRegionInfo regionInfo, Path initialFiles, - FlushRequester flushListener, final Progressable reporter) - throws IOException { - + HRegionInfo regionInfo, + FlushRequester flushListener) { this.basedir = basedir; this.log = log; this.fs = fs; @@ -447,7 +420,6 @@ public class HRegion implements HConstants { this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000); String encodedNameStr = Integer.toString(this.regionInfo.getEncodedName()); this.regiondir = new Path(basedir, encodedNameStr); - Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME); this.historian = RegionHistorian.getInstance(); if (LOG.isDebugEnabled()) { @@ -457,6 +429,27 @@ public class HRegion implements HConstants { this.regionCompactionDir = new Path(getCompactionDir(basedir), encodedNameStr); + + int flushSize = regionInfo.getTableDesc().getMemcacheFlushSize(); + if (flushSize == HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE) { + flushSize = conf.getInt("hbase.hregion.memcache.flush.size", + HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE); + } + this.memcacheFlushSize = flushSize; + + this.blockingMemcacheSize = this.memcacheFlushSize * + conf.getInt("hbase.hregion.memcache.block.multiplier", 1); + } + + /** Initialize this region and get it ready to roll. + * + * @param initialFiles + * @param reporter + * @throws IOException + */ + public void initialize( Path initialFiles, + final Progressable reporter) throws IOException { + Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME); // Move prefab HStore files into place (if any). This picks up split files // and any merges from splits and merges dirs. @@ -466,16 +459,20 @@ public class HRegion implements HConstants { // Load in all the HStores. long maxSeqId = -1; + long minSeqId = Integer.MAX_VALUE; for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) { HStore store = instantiateHStore(this.basedir, c, oldLogFile, reporter); stores.put(Bytes.mapKey(c.getName()), store); long storeSeqId = store.getMaxSequenceId(); if (storeSeqId > maxSeqId) { maxSeqId = storeSeqId; + } + if (storeSeqId < minSeqId) { + minSeqId = storeSeqId; } } - doReconstructionLog(oldLogFile, maxSeqId, reporter); + doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter); if (fs.exists(oldLogFile)) { if (LOG.isDebugEnabled()) { @@ -501,17 +498,6 @@ public class HRegion implements HConstants { if (fs.exists(merges)) { fs.delete(merges, true); } - - int flushSize = regionInfo.getTableDesc().getMemcacheFlushSize(); - if (flushSize == HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE) { - flushSize = conf.getInt("hbase.hregion.memcache.flush.size", - HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE); - } - this.memcacheFlushSize = flushSize; - - this.blockingMemcacheSize = this.memcacheFlushSize * - conf.getInt("hbase.hregion.memcache.block.multiplier", 1); - // See if region is meant to run read-only. if (this.regionInfo.getTableDesc().isReadOnly()) { this.writestate.setReadOnly(true); @@ -797,10 +783,12 @@ public class HRegion implements HConstants { // Opening the region copies the splits files from the splits directory // under each region. HRegion regionA = - new HRegion(basedir, log, fs, conf, regionAInfo, dirA, null); + new HRegion(basedir, log, fs, conf, regionAInfo, null); + regionA.initialize(dirA, null); regionA.close(); HRegion regionB = - new HRegion(basedir, log, fs, conf, regionBInfo, dirB, null); + new HRegion(basedir, log, fs, conf, regionBInfo, null); + regionB.initialize(dirB, null); regionB.close(); // Cleanup @@ -1029,12 +1017,14 @@ public class HRegion implements HConstants { // again so its value will represent the size of the updates received // during the flush long sequenceId = -1L; + long completeSequenceId = -1L; this.updatesLock.writeLock().lock(); try { for (HStore s: stores.values()) { s.snapshot(); } sequenceId = log.startCacheFlush(); + completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId); this.memcacheSize.set(0); } finally { this.updatesLock.writeLock().unlock(); @@ -1050,7 +1040,7 @@ public class HRegion implements HConstants { // Keep running vector of all store files that includes both old and the // just-made new flush store file. for (HStore hstore: stores.values()) { - boolean needsCompaction = hstore.flushCache(sequenceId); + boolean needsCompaction = hstore.flushCache(completeSequenceId); if (needsCompaction) { compactionRequested = true; } @@ -1077,7 +1067,7 @@ public class HRegion implements HConstants { // and that all updates to the log for this regionName that have lower // log-sequence-ids can be safely ignored. this.log.completeCacheFlush(getRegionName(), - regionInfo.getTableDesc().getName(), sequenceId); + regionInfo.getTableDesc().getName(), completeSequenceId); // C. Finally notify anyone waiting on memcache to clear: // e.g. checkResources(). @@ -1099,6 +1089,18 @@ public class HRegion implements HConstants { return compactionRequested; } + /** + * Get the sequence number to be associated with this cache flush. Used by + * TransactionalRegion to not complete pending transactions. + * + * + * @param currentSequenceId + * @return sequence id to complete the cache flush with + */ + protected long getCompleteCacheFlushSequenceId(long currentSequenceId) { + return currentSequenceId; + } + ////////////////////////////////////////////////////////////////////////////// // get() methods for client use. ////////////////////////////////////////////////////////////////////////////// @@ -1346,7 +1348,33 @@ public class HRegion implements HConstants { * @param b * @throws IOException */ - public void batchUpdate(BatchUpdate b, Integer lockid) + public void batchUpdate(BatchUpdate b) throws IOException { + this.batchUpdate(b, null, true); + } + + /** + * @param b + * @throws IOException + */ + public void batchUpdate(BatchUpdate b, boolean writeToWAL) throws IOException { + this.batchUpdate(b, null, writeToWAL); + } + + + /** + * @param b + * @throws IOException + */ + public void batchUpdate(BatchUpdate b, Integer lockid) throws IOException { + this.batchUpdate(b, lockid, true); + } + + /** + * @param b + * @param writeToWal if true, then we write this update to the log + * @throws IOException + */ + public void batchUpdate(BatchUpdate b, Integer lockid, boolean writeToWAL) throws IOException { checkReadOnly(); @@ -1395,7 +1423,7 @@ public class HRegion implements HConstants { this.targetColumns.remove(lid); if (edits != null && edits.size() > 0) { - update(edits); + update(edits, writeToWAL); } if (deletes != null && deletes.size() > 0) { @@ -1597,16 +1625,25 @@ public class HRegion implements HConstants { } targets.put(key, val); } - - /* + + /** * Add updates first to the hlog and then add values to memcache. * Warning: Assumption is caller has lock on passed in row. - * @param row Row to update. - * @param timestamp Timestamp to record the updates against * @param updatesByColumn Cell updates by column * @throws IOException */ - private void update(final TreeMap updatesByColumn) + private void update(final TreeMap updatesByColumn) throws IOException { + this.update(updatesByColumn, true); + } + + /** + * Add updates first to the hlog (if writeToWal) and then add values to memcache. + * Warning: Assumption is caller has lock on passed in row. + * @param writeToWAL if true, then we should write to the log + * @param updatesByColumn Cell updates by column + * @throws IOException + */ + private void update(final TreeMap updatesByColumn, boolean writeToWAL) throws IOException { if (updatesByColumn == null || updatesByColumn.size() <= 0) { return; @@ -1615,8 +1652,10 @@ public class HRegion implements HConstants { boolean flush = false; this.updatesLock.readLock().lock(); try { - this.log.append(regionInfo.getRegionName(), - regionInfo.getTableDesc().getName(), updatesByColumn); + if (writeToWAL) { + this.log.append(regionInfo.getRegionName(), regionInfo.getTableDesc() + .getName(), updatesByColumn); + } long size = 0; for (Map.Entry e: updatesByColumn.entrySet()) { HStoreKey key = e.getKey(); @@ -1660,7 +1699,7 @@ public class HRegion implements HConstants { // Do any reconstruction needed from the log @SuppressWarnings("unused") - protected void doReconstructionLog(Path oldLogFile, long maxSeqId, + protected void doReconstructionLog(Path oldLogFile, long minSeqId, long maxSeqId, Progressable reporter) throws UnsupportedEncodingException, IOException { // Nothing to do (Replaying is done in HStores) @@ -2105,9 +2144,11 @@ public class HRegion implements HConstants { if (!info.isMetaRegion()) { RegionHistorian.getInstance().addRegionCreation(info); } - return new HRegion(tableDir, - new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null), - fs, conf, info, null, null); + HRegion region = new HRegion(tableDir, + new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null), + fs, conf, info, null); + region.initialize(null, null); + return region; } /** @@ -2134,7 +2175,8 @@ public class HRegion implements HConstants { } HRegion r = new HRegion( HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName()), - log, FileSystem.get(conf), conf, info, null, null); + log, FileSystem.get(conf), conf, info, null); + r.initialize(null, null); if (log != null) { log.setSequenceNumber(r.getMinSequenceId()); } diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java index 9a6b29c8b26..6926d8b6c23 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java @@ -900,13 +900,15 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable { protected HRegion instantiateRegion(final HRegionInfo regionInfo) throws IOException { - return new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo - .getTableDesc().getName()), this.log, this.fs, conf, regionInfo, null, - this.cacheFlusher, new Progressable() { - public void progress() { - addProcessingMessage(regionInfo); - } - }); + HRegion r = new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo + .getTableDesc().getName()), this.log, this.fs, conf, regionInfo, + this.cacheFlusher); + r.initialize(null, new Progressable() { + public void progress() { + addProcessingMessage(regionInfo); + } + }); + return r; } /* diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HStore.java b/src/java/org/apache/hadoop/hbase/regionserver/HStore.java index 61dd58e5caa..e90cf72872d 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HStore.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HStore.java @@ -311,7 +311,7 @@ public class HStore implements HConstants { // Check this edit is for me. Also, guard against writing // METACOLUMN info such as HBASE::CACHEFLUSH entries byte [] column = val.getColumn(); - if (Bytes.equals(column, HLog.METACOLUMN) + if (val.isTransactionEntry() || Bytes.equals(column, HLog.METACOLUMN) || !Bytes.equals(key.getRegionName(), info.getRegionName()) || !HStoreKey.matchingFamily(family.getName(), column)) { continue; @@ -1316,8 +1316,7 @@ public class HStore implements HConstants { * @return Matching keys. * @throws IOException */ - List getKeys(final HStoreKey origin, final int versions, - final long now) + public List getKeys(final HStoreKey origin, final int versions, final long now) throws IOException { // This code below is very close to the body of the get method. Any // changes in the flow below should also probably be done in get. TODO: diff --git a/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java b/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java new file mode 100644 index 00000000000..44d4629dbb0 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/regionserver/transactional/CleanOldTransactionsChore.java @@ -0,0 +1,53 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.transactional; + +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.hadoop.hbase.Chore; +import org.apache.hadoop.hbase.regionserver.HRegion; + +/** + * Cleans up committed transactions when they are no longer needed to verify + * pending transactions. + */ +class CleanOldTransactionsChore extends Chore { + + private static final String SLEEP_CONF = "hbase.transaction.clean.sleep"; + private static final int DEFAULT_SLEEP = 60 * 1000; + + private final TransactionalRegionServer regionServer; + + public CleanOldTransactionsChore( + final TransactionalRegionServer regionServer, + final AtomicBoolean stopRequest) { + super(regionServer.getConfiguration().getInt(SLEEP_CONF, DEFAULT_SLEEP), + stopRequest); + this.regionServer = regionServer; + } + + @Override + protected void chore() { + for (HRegion region : regionServer.getOnlineRegions()) { + ((TransactionalRegion) region).removeUnNeededCommitedTransactions(); + } + } + +} diff --git a/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java b/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java new file mode 100644 index 00000000000..9d28e08354c --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java @@ -0,0 +1,296 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.transactional; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.io.BatchOperation; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Holds the state of a transaction. + */ +class TransactionState { + + private static final Log LOG = LogFactory.getLog(TransactionState.class); + + /** Current status. */ + public enum Status { + /** Initial status, still performing operations. */ + PENDING, + /** + * Checked if we can commit, and said yes. Still need to determine the + * global decision. + */ + COMMIT_PENDING, + /** Committed. */ + COMMITED, + /** Aborted. */ + ABORTED + } + + private final long hLogStartSequenceId; + private final long transactionId; + private Status status; + private SortedSet readSet = new TreeSet( + Bytes.BYTES_COMPARATOR); + private List writeSet = new LinkedList(); + private Set transactionsToCheck = new HashSet(); + private int startSequenceNumber; + private Integer sequenceNumber; + boolean hasScan = false; + + public TransactionState(final long transactionId, + final long rLogStartSequenceId) { + this.transactionId = transactionId; + this.hLogStartSequenceId = rLogStartSequenceId; + this.status = Status.PENDING; + } + + public void addRead(final byte[] rowKey) { + readSet.add(rowKey); + } + + public Set getReadSet() { + return readSet; + } + + public void addWrite(final BatchUpdate write) { + writeSet.add(write); + } + + public List getWriteSet() { + return writeSet; + } + + /** + * GetFull from the writeSet. + * + * @param row + * @param columns + * @param timestamp + * @return + */ + public Map localGetFull(final byte[] row, + final Set columns, final long timestamp) { + Map results = new TreeMap( + Bytes.BYTES_COMPARATOR); // Must use the Bytes Conparator because + for (BatchUpdate b : writeSet) { + if (!Bytes.equals(row, b.getRow())) { + continue; + } + if (b.getTimestamp() > timestamp) { + continue; + } + for (BatchOperation op : b) { + if (!op.isPut() + || (columns != null && !columns.contains(op.getColumn()))) { + continue; + } + results.put(op.getColumn(), new Cell(op.getValue(), b.getTimestamp())); + } + } + return results.size() == 0 ? null : results; + } + + /** + * Get from the writeSet. + * + * @param row + * @param column + * @param timestamp + * @return + */ + public Cell[] localGet(final byte[] row, final byte[] column, + final long timestamp) { + ArrayList results = new ArrayList(); + + // Go in reverse order to put newest updates first in list + for (int i = writeSet.size() - 1; i >= 0; i--) { + BatchUpdate b = writeSet.get(i); + + if (!Bytes.equals(row, b.getRow())) { + continue; + } + if (b.getTimestamp() > timestamp) { + continue; + } + for (BatchOperation op : b) { + if (!op.isPut() || !Bytes.equals(column, op.getColumn())) { + continue; + } + results.add(new Cell(op.getValue(), b.getTimestamp())); + } + } + return results.size() == 0 ? null : results + .toArray(new Cell[results.size()]); + } + + public void addTransactionToCheck(final TransactionState transaction) { + transactionsToCheck.add(transaction); + } + + public boolean hasConflict() { + for (TransactionState transactionState : transactionsToCheck) { + if (hasConflict(transactionState)) { + return true; + } + } + return false; + } + + private boolean hasConflict(final TransactionState checkAgainst) { + if (checkAgainst.getStatus().equals(TransactionState.Status.ABORTED)) { + return false; // Cannot conflict with aborted transactions + } + + for (BatchUpdate otherUpdate : checkAgainst.getWriteSet()) { + if (this.hasScan) { + LOG.info("Transaction" + this.toString() + + " has a scan read. Meanwile a write occured. " + + "Conservitivly reporting conflict"); + return true; + } + + if (this.getReadSet().contains(otherUpdate.getRow())) { + LOG.trace("Transaction " + this.toString() + " conflicts with " + + checkAgainst.toString()); + return true; + } + } + return false; + } + + /** + * Get the status. + * + * @return Return the status. + */ + public Status getStatus() { + return status; + } + + /** + * Set the status. + * + * @param status The status to set. + */ + public void setStatus(final Status status) { + this.status = status; + } + + /** + * Get the startSequenceNumber. + * + * @return Return the startSequenceNumber. + */ + public int getStartSequenceNumber() { + return startSequenceNumber; + } + + /** + * Set the startSequenceNumber. + * + * @param startSequenceNumber. + */ + public void setStartSequenceNumber(final int startSequenceNumber) { + this.startSequenceNumber = startSequenceNumber; + } + + /** + * Get the sequenceNumber. + * + * @return Return the sequenceNumber. + */ + public Integer getSequenceNumber() { + return sequenceNumber; + } + + /** + * Set the sequenceNumber. + * + * @param sequenceNumber The sequenceNumber to set. + */ + public void setSequenceNumber(final Integer sequenceNumber) { + this.sequenceNumber = sequenceNumber; + } + + @Override + public String toString() { + StringBuilder result = new StringBuilder(); + result.append("[transactionId: "); + result.append(transactionId); + result.append(" status: "); + result.append(status.name()); + result.append(" read Size: "); + result.append(readSet.size()); + result.append(" write Size: "); + result.append(writeSet.size()); + result.append(" startSQ: "); + result.append(startSequenceNumber); + if (sequenceNumber != null) { + result.append(" commitedSQ:"); + result.append(sequenceNumber); + } + result.append("]"); + + return result.toString(); + } + + /** + * Get the transactionId. + * + * @return Return the transactionId. + */ + public long getTransactionId() { + return transactionId; + } + + /** + * Get the startSequenceId. + * + * @return Return the startSequenceId. + */ + public long getHLogStartSequenceId() { + return hLogStartSequenceId; + } + + /** + * Set the hasScan. + * + * @param hasScan The hasScan to set. + */ + public void setHasScan(final boolean hasScan) { + this.hasScan = hasScan; + } + +} diff --git a/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java b/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java new file mode 100644 index 00000000000..f14652cd72a --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalHLogManager.java @@ -0,0 +1,260 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.transactional; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.io.BatchOperation; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.regionserver.HLog; +import org.apache.hadoop.hbase.regionserver.HLogEdit; +import org.apache.hadoop.hbase.regionserver.HLogKey; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.util.Progressable; + +/** + * Responsible for writing and reading (recovering) transactional information + * to/from the HLog. + * + * + */ +class TransactionalHLogManager { + + private static final Log LOG = LogFactory + .getLog(TransactionalHLogManager.class); + + private final HLog hlog; + private final FileSystem fileSystem; + private final HRegionInfo regionInfo; + private final HBaseConfiguration conf; + + public TransactionalHLogManager(final TransactionalRegion region) { + this.hlog = region.getLog(); + this.fileSystem = region.getFilesystem(); + this.regionInfo = region.getRegionInfo(); + this.conf = region.getConf(); + } + + // For Testing + TransactionalHLogManager(final HLog hlog, final FileSystem fileSystem, + final HRegionInfo regionInfo, final HBaseConfiguration conf) { + this.hlog = hlog; + this.fileSystem = fileSystem; + this.regionInfo = regionInfo; + this.conf = conf; + } + + public void writeStartToLog(final long transactionId) throws IOException { + HLogEdit logEdit; + logEdit = new HLogEdit(transactionId, HLogEdit.TransactionalOperation.START); + + hlog.append(regionInfo, logEdit); + } + + public void writeUpdateToLog(final long transactionId, + final BatchUpdate update) throws IOException { + + long commitTime = update.getTimestamp() == HConstants.LATEST_TIMESTAMP ? System + .currentTimeMillis() + : update.getTimestamp(); + + for (BatchOperation op : update) { + HLogEdit logEdit = new HLogEdit(transactionId, op, commitTime); + hlog.append(regionInfo, update.getRow(), logEdit); + } + } + + public void writeCommitToLog(final long transactionId) throws IOException { + HLogEdit logEdit; + logEdit = new HLogEdit(transactionId, + HLogEdit.TransactionalOperation.COMMIT); + + hlog.append(regionInfo, logEdit); + } + + public void writeAbortToLog(final long transactionId) throws IOException { + HLogEdit logEdit; + logEdit = new HLogEdit(transactionId, HLogEdit.TransactionalOperation.ABORT); + + hlog.append(regionInfo, logEdit); + } + + public Map> getCommitsFromLog( + final Path reconstructionLog, final long maxSeqID, + final Progressable reporter) throws UnsupportedEncodingException, + IOException { + if (reconstructionLog == null || !fileSystem.exists(reconstructionLog)) { + // Nothing to do. + return null; + } + // Check its not empty. + FileStatus[] stats = fileSystem.listStatus(reconstructionLog); + if (stats == null || stats.length == 0) { + LOG.warn("Passed reconstruction log " + reconstructionLog + + " is zero-length"); + return null; + } + + SortedMap> pendingTransactionsById = new TreeMap>(); + SortedMap> commitedTransactionsById = new TreeMap>(); + Set abortedTransactions = new HashSet(); + + SequenceFile.Reader logReader = new SequenceFile.Reader(fileSystem, + reconstructionLog, conf); + + try { + HLogKey key = new HLogKey(); + HLogEdit val = new HLogEdit(); + long skippedEdits = 0; + long totalEdits = 0; + long startCount = 0; + long writeCount = 0; + long abortCount = 0; + long commitCount = 0; + // How many edits to apply before we send a progress report. + int reportInterval = conf.getInt("hbase.hstore.report.interval.edits", + 2000); + while (logReader.next(key, val)) { + LOG.debug("Processing edit: key: " + key.toString() + " val: " + + val.toString()); + if (key.getLogSeqNum() < maxSeqID) { + skippedEdits++; + continue; + } + + // Check this edit is for me. + byte[] column = val.getColumn(); + Long transactionId = val.getTransactionId(); + if (!val.isTransactionEntry() || HLog.isMetaColumn(column) + || !Bytes.equals(key.getRegionName(), regionInfo.getRegionName())) { + continue; + } + + List updates = pendingTransactionsById.get(transactionId); + switch (val.getOperation()) { + case START: + if (updates != null || abortedTransactions.contains(transactionId) + || commitedTransactionsById.containsKey(transactionId)) { + LOG.error("Processing start for transaction: " + transactionId + + ", but have already seen start message"); + throw new IOException("Corrupted transaction log"); + } + updates = new LinkedList(); + pendingTransactionsById.put(transactionId, updates); + startCount++; + break; + + case WRITE: + if (updates == null) { + LOG.error("Processing edit for transaction: " + transactionId + + ", but have not seen start message"); + throw new IOException("Corrupted transaction log"); + } + + BatchUpdate tranUpdate = new BatchUpdate(key.getRow()); + if (val.getVal() != null) { + tranUpdate.put(val.getColumn(), val.getVal()); + } else { + tranUpdate.delete(val.getColumn()); + } + updates.add(tranUpdate); + writeCount++; + break; + + case ABORT: + if (updates == null) { + LOG.error("Processing abort for transaction: " + transactionId + + ", but have not seen start message"); + throw new IOException("Corrupted transaction log"); + } + abortedTransactions.add(transactionId); + pendingTransactionsById.remove(transactionId); + abortCount++; + break; + + case COMMIT: + if (updates == null) { + LOG.error("Processing commit for transaction: " + transactionId + + ", but have not seen start message"); + throw new IOException("Corrupted transaction log"); + } + if (abortedTransactions.contains(transactionId)) { + LOG.error("Processing commit for transaction: " + transactionId + + ", but also have abort message"); + throw new IOException("Corrupted transaction log"); + } + if (updates.size() == 0) { + LOG + .warn("Transaciton " + transactionId + + " has no writes in log. "); + } + if (commitedTransactionsById.containsKey(transactionId)) { + LOG.error("Processing commit for transaction: " + transactionId + + ", but have already commited transaction with that id"); + throw new IOException("Corrupted transaction log"); + } + pendingTransactionsById.remove(transactionId); + commitedTransactionsById.put(transactionId, updates); + commitCount++; + } + totalEdits++; + + if (reporter != null && (totalEdits % reportInterval) == 0) { + reporter.progress(); + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Read " + totalEdits + " tranasctional operations (skipped " + + skippedEdits + " because sequence id <= " + maxSeqID + "): " + + startCount + " starts, " + writeCount + " writes, " + abortCount + + " aborts, and " + commitCount + " commits."); + } + } finally { + logReader.close(); + } + + if (pendingTransactionsById.size() > 0) { + LOG + .info("Region log has " + + pendingTransactionsById.size() + + " unfinished transactions. Going to the transaction log to resolve"); + throw new RuntimeException("Transaction log not yet implemented"); + } + + return commitedTransactionsById; + } +} diff --git a/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java b/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java new file mode 100644 index 00000000000..122518913e3 --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java @@ -0,0 +1,673 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.transactional; + +import java.io.IOException; +import java.io.UnsupportedEncodingException; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; +import java.util.Map.Entry; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HStoreKey; +import org.apache.hadoop.hbase.LeaseException; +import org.apache.hadoop.hbase.LeaseListener; +import org.apache.hadoop.hbase.Leases; +import org.apache.hadoop.hbase.Leases.LeaseStillHeldException; +import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.regionserver.FlushRequester; +import org.apache.hadoop.hbase.regionserver.HLog; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HStore; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.util.Progressable; + +/** + * Regionserver which provides transactional support for atomic transactions. + * This is achieved with optimistic concurrency control (see + * http://www.seas.upenn.edu/~zives/cis650/papers/opt-cc.pdf). We keep track + * read and write sets for each transaction, and hold off on processing the + * writes. To decide to commit a transaction we check its read sets with all + * transactions that have committed while it was running for overlaps. + *

+ * Because transactions can span multiple regions, all regions must agree to + * commit a transactions. The client side of this commit protocol is encoded in + * org.apache.hadoop.hbase.client.transactional.TransactionManger + *

+ * In the event of an failure of the client mid-commit, (after we voted yes), we + * will have to consult the transaction log to determine the final decision of + * the transaction. This is not yet implemented. + */ +class TransactionalRegion extends HRegion { + + private static final String LEASE_TIME = "hbase.transaction.leaseTime"; + private static final int DEFAULT_LEASE_TIME = 60 * 1000; + private static final int LEASE_CHECK_FREQUENCY = 1000; + + private static final String OLD_TRANSACTION_FLUSH = "hbase.transaction.flush"; + private static final int DEFAULT_OLD_TRANSACTION_FLUSH = 100; // Do a flush if we have this many old transactions.. + + + private static final Log LOG = LogFactory.getLog(TransactionalRegion.class); + + // Collection of active transactions (PENDING) keyed by id. + private Map transactionsById = new HashMap(); + + // Map of recent transactions that are COMMIT_PENDING or COMMITED keyed by + // their sequence number + private SortedMap commitedTransactionsBySequenceNumber = Collections + .synchronizedSortedMap(new TreeMap()); + + // Collection of transactions that are COMMIT_PENDING + private Set commitPendingTransactions = Collections + .synchronizedSet(new HashSet()); + + private final Leases transactionLeases; + private AtomicInteger nextSequenceId = new AtomicInteger(0); + private Object commitCheckLock = new Object(); + private TransactionalHLogManager logManager; + private final int oldTransactionFlushTrigger; + + public TransactionalRegion(final Path basedir, final HLog log, + final FileSystem fs, final HBaseConfiguration conf, + final HRegionInfo regionInfo, final FlushRequester flushListener) { + super(basedir, log, fs, conf, regionInfo, flushListener); + transactionLeases = new Leases(conf.getInt(LEASE_TIME, DEFAULT_LEASE_TIME), + LEASE_CHECK_FREQUENCY); + logManager = new TransactionalHLogManager(this); + oldTransactionFlushTrigger = conf.getInt(OLD_TRANSACTION_FLUSH, DEFAULT_OLD_TRANSACTION_FLUSH); + } + + @Override + protected void doReconstructionLog(final Path oldLogFile, + final long minSeqId, final long maxSeqId, final Progressable reporter) + throws UnsupportedEncodingException, IOException { + super.doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter); + + Map> commitedTransactionsById = logManager + .getCommitsFromLog(oldLogFile, minSeqId, reporter); + + if (commitedTransactionsById != null && commitedTransactionsById.size() > 0) { + LOG.debug("found " + commitedTransactionsById.size() + + " COMMITED transactions"); + + for (Entry> entry : commitedTransactionsById + .entrySet()) { + LOG.debug("Writing " + entry.getValue().size() + + " updates for transaction " + entry.getKey()); + for (BatchUpdate b : entry.getValue()) { + super.batchUpdate(b, true); // These are walled so they live forever + } + } + + // LOG.debug("Flushing cache"); // We must trigger a cache flush, + // otherwise + // we will would ignore the log on subsequent failure + // if (!super.flushcache()) { + // LOG.warn("Did not flush cache"); + // } + } + } + + /** + * We need to make sure that we don't complete a cache flush between running + * transactions. If we did, then we would not find all log messages needed to + * restore the transaction, as some of them would be before the last + * "complete" flush id. + */ + @Override + protected long getCompleteCacheFlushSequenceId(final long currentSequenceId) { + long minPendingStartSequenceId = currentSequenceId; + for (TransactionState transactionState : transactionsById.values()) { + minPendingStartSequenceId = Math.min(minPendingStartSequenceId, + transactionState.getHLogStartSequenceId()); + } + return minPendingStartSequenceId; + } + + public void beginTransaction(final long transactionId) throws IOException { + String key = String.valueOf(transactionId); + if (transactionsById.get(key) != null) { + TransactionState alias = getTransactionState(transactionId); + if (alias != null) { + alias.setStatus(Status.ABORTED); + retireTransaction(alias); + } + throw new IOException("Already exiting transaction id: " + key); + } + + TransactionState state = new TransactionState(transactionId, super.getLog() + .getSequenceNumber()); + + // Order is important here + for (TransactionState commitPending : commitPendingTransactions) { + state.addTransactionToCheck(commitPending); + } + state.setStartSequenceNumber(nextSequenceId.get()); + + transactionsById.put(String.valueOf(key), state); + try { + transactionLeases.createLease(key, new TransactionLeaseListener(key)); + } catch (LeaseStillHeldException e) { + throw new RuntimeException(e); + } + LOG.debug("Begining transaction " + key + " in region " + + super.getRegionInfo().getRegionNameAsString()); + logManager.writeStartToLog(transactionId); + + maybeTriggerOldTransactionFlush(); + } + + /** + * Fetch a single data item. + * + * @param transactionId + * @param row + * @param column + * @return column value + * @throws IOException + */ + public Cell get(final long transactionId, final byte[] row, + final byte[] column) throws IOException { + Cell[] results = get(transactionId, row, column, 1); + return (results == null || results.length == 0) ? null : results[0]; + } + + /** + * Fetch multiple versions of a single data item + * + * @param transactionId + * @param row + * @param column + * @param numVersions + * @return array of values one element per version + * @throws IOException + */ + public Cell[] get(final long transactionId, final byte[] row, + final byte[] column, final int numVersions) throws IOException { + return get(transactionId, row, column, Long.MAX_VALUE, numVersions); + } + + /** + * Fetch multiple versions of a single data item, with timestamp. + * + * @param transactionId + * @param row + * @param column + * @param timestamp + * @param numVersions + * @return array of values one element per version that matches the timestamp + * @throws IOException + */ + public Cell[] get(final long transactionId, final byte[] row, + final byte[] column, final long timestamp, final int numVersions) + throws IOException { + TransactionState state = getTransactionState(transactionId); + + state.addRead(row); + + Cell[] localCells = state.localGet(row, column, timestamp); + + if (localCells != null && localCells.length > 0) { + LOG + .trace("Transactional get of something we've written in the same transaction " + + transactionId); + LOG.trace("row: " + Bytes.toString(row)); + LOG.trace("col: " + Bytes.toString(column)); + LOG.trace("numVersions: " + numVersions); + for (Cell cell : localCells) { + LOG.trace("cell: " + Bytes.toString(cell.getValue())); + } + + if (numVersions > 1) { + Cell[] globalCells = get(row, column, timestamp, numVersions - 1); + Cell[] result = new Cell[globalCells.length + localCells.length]; + System.arraycopy(localCells, 0, result, 0, localCells.length); + System.arraycopy(globalCells, 0, result, localCells.length, + globalCells.length); + return result; + } + return localCells; + } + + return get(row, column, timestamp, numVersions); + } + + /** + * Fetch all the columns for the indicated row at a specified timestamp. + * Returns a TreeMap that maps column names to values. + * + * @param transactionId + * @param row + * @param columns Array of columns you'd like to retrieve. When null, get all. + * @param ts + * @return Map values + * @throws IOException + */ + public Map getFull(final long transactionId, final byte[] row, + final Set columns, final long ts) throws IOException { + TransactionState state = getTransactionState(transactionId); + + state.addRead(row); + + Map localCells = state.localGetFull(row, columns, ts); + + if (localCells != null && localCells.size() > 0) { + LOG + .trace("Transactional get of something we've written in the same transaction " + + transactionId); + LOG.trace("row: " + Bytes.toString(row)); + for (Entry entry : localCells.entrySet()) { + LOG.trace("col: " + Bytes.toString(entry.getKey())); + LOG.trace("cell: " + Bytes.toString(entry.getValue().getValue())); + } + + Map internalResults = getFull(row, columns, ts, null); + internalResults.putAll(localCells); + return internalResults; + } + + return getFull(row, columns, ts, null); + } + + /** + * Return an iterator that scans over the HRegion, returning the indicated + * columns for only the rows that match the data filter. This Iterator must be + * closed by the caller. + * + * @param transactionId + * @param cols columns to scan. If column name is a column family, all columns + * of the specified column family are returned. Its also possible to pass a + * regex in the column qualifier. A column qualifier is judged to be a regex + * if it contains at least one of the following characters: + * \+|^&*$[]]}{)(. + * @param firstRow row which is the starting point of the scan + * @param timestamp only return rows whose timestamp is <= this value + * @param filter row filter + * @return InternalScanner + * @throws IOException + */ + public InternalScanner getScanner(final long transactionId, + final byte[][] cols, final byte[] firstRow, final long timestamp, + final RowFilterInterface filter) throws IOException { + return new ScannerWrapper(transactionId, super.getScanner(cols, firstRow, + timestamp, filter)); + } + + /** + * Add a write to the transaction. Does not get applied until commit process. + * + * @param b + * @throws IOException + */ + public void batchUpdate(final long transactionId, final BatchUpdate b) + throws IOException { + TransactionState state = getTransactionState(transactionId); + state.addWrite(b); + logManager.writeUpdateToLog(transactionId, b); + } + + /** + * Add a delete to the transaction. Does not get applied until commit process. + * FIXME, not sure about this approach + * + * @param b + * @throws IOException + */ + public void deleteAll(final long transactionId, final byte[] row, + final long timestamp) throws IOException { + TransactionState state = getTransactionState(transactionId); + long now = System.currentTimeMillis(); + + for (HStore store : super.stores.values()) { + List keys = store.getKeys(new HStoreKey(row, timestamp), + ALL_VERSIONS, now); + BatchUpdate deleteUpdate = new BatchUpdate(row, timestamp); + + for (HStoreKey key : keys) { + deleteUpdate.delete(key.getColumn()); + } + + state.addWrite(deleteUpdate); + logManager.writeUpdateToLog(transactionId, deleteUpdate); + + } + + } + + public boolean commitRequest(final long transactionId) throws IOException { + synchronized (commitCheckLock) { + TransactionState state = getTransactionState(transactionId); + if (state == null) { + return false; + } + + if (hasConflict(state)) { + state.setStatus(Status.ABORTED); + retireTransaction(state); + return false; + } + + // No conflicts, we can commit. + LOG.trace("No conflicts for transaction " + transactionId + + " found in region " + super.getRegionInfo().getRegionNameAsString() + + ". Voting for commit"); + state.setStatus(Status.COMMIT_PENDING); + + // If there are writes we must keep record off the transaction + if (state.getWriteSet().size() > 0) { + // Order is important + commitPendingTransactions.add(state); + state.setSequenceNumber(nextSequenceId.getAndIncrement()); + commitedTransactionsBySequenceNumber.put(state.getSequenceNumber(), + state); + } + + return true; + } + } + + private boolean hasConflict(final TransactionState state) { + // Check transactions that were committed while we were running + for (int i = state.getStartSequenceNumber(); i < nextSequenceId.get(); i++) { + TransactionState other = commitedTransactionsBySequenceNumber.get(i); + if (other == null) { + continue; + } + state.addTransactionToCheck(other); + } + + return state.hasConflict(); + } + + /** + * Commit the transaction. + * + * @param transactionId + * @return + * @throws IOException + */ + public void commit(final long transactionId) throws IOException { + TransactionState state; + try { + state = getTransactionState(transactionId); + } catch (UnknownTransactionException e) { + LOG.fatal("Asked to commit unknown transaction: " + transactionId + + " in region " + super.getRegionInfo().getRegionNameAsString()); + // FIXME Write to the transaction log that this transaction was corrupted + throw e; + } + + if (!state.getStatus().equals(Status.COMMIT_PENDING)) { + LOG.fatal("Asked to commit a non pending transaction"); + // FIXME Write to the transaction log that this transaction was corrupted + throw new IOException("commit failure"); + } + + commit(state); + } + + /** + * Commit the transaction. + * + * @param transactionId + * @return + * @throws IOException + */ + public void abort(final long transactionId) throws IOException { + TransactionState state; + try { + state = getTransactionState(transactionId); + } catch (UnknownTransactionException e) { + LOG.error("Asked to abort unknown transaction: " + transactionId); + return; + } + + state.setStatus(Status.ABORTED); + + if (state.getWriteSet().size() > 0) { + logManager.writeAbortToLog(state.getTransactionId()); + } + + // Following removes needed if we have voted + if (state.getSequenceNumber() != null) { + commitedTransactionsBySequenceNumber.remove(state.getSequenceNumber()); + } + commitPendingTransactions.remove(state); + + retireTransaction(state); + } + + private void commit(final TransactionState state) throws IOException { + + LOG.debug("Commiting transaction: " + state.toString() + " to " + + super.getRegionInfo().getRegionNameAsString()); + + if (state.getWriteSet().size() > 0) { + logManager.writeCommitToLog(state.getTransactionId()); + } + + for (BatchUpdate update : state.getWriteSet()) { + super.batchUpdate(update, false); // Don't need to WAL these + // FIME, maybe should be walled so we don't need to look so far back. + } + + state.setStatus(Status.COMMITED); + if (state.getWriteSet().size() > 0 + && !commitPendingTransactions.remove(state)) { + LOG + .fatal("Commiting a non-query transaction that is not in commitPendingTransactions"); + throw new IOException("commit failure"); // FIXME, how to handle? + } + retireTransaction(state); + } + + // Cancel leases, and removed from lease lookup. This transaction may still + // live in commitedTransactionsBySequenceNumber and commitPendingTransactions + private void retireTransaction(final TransactionState state) { + String key = String.valueOf(state.getTransactionId()); + try { + transactionLeases.cancelLease(key); + } catch (LeaseException e) { + // Ignore + } + + transactionsById.remove(key); + } + + private TransactionState getTransactionState(final long transactionId) + throws UnknownTransactionException { + String key = String.valueOf(transactionId); + TransactionState state = null; + + state = transactionsById.get(key); + + if (state == null) { + LOG.trace("Unknown transaction: " + key); + throw new UnknownTransactionException(key); + } + + try { + transactionLeases.renewLease(key); + } catch (LeaseException e) { + throw new RuntimeException(e); + } + + return state; + } + + private void maybeTriggerOldTransactionFlush() { + if (commitedTransactionsBySequenceNumber.size() > oldTransactionFlushTrigger) { + removeUnNeededCommitedTransactions(); + } + } + + /** + * Cleanup references to committed transactions that are no longer needed. + * + */ + synchronized void removeUnNeededCommitedTransactions() { + Integer minStartSeqNumber = getMinStartSequenceNumber(); + if (minStartSeqNumber == null) { + minStartSeqNumber = Integer.MAX_VALUE; // Remove all + } + + int numRemoved = 0; + // Copy list to avoid conc update exception + for (Entry entry : new LinkedList>( + commitedTransactionsBySequenceNumber.entrySet())) { + if (entry.getKey() >= minStartSeqNumber) { + break; + } + numRemoved = numRemoved + + (commitedTransactionsBySequenceNumber.remove(entry.getKey()) == null ? 0 + : 1); + numRemoved++; + } + + if (numRemoved > 0) { + LOG.debug("Removed " + numRemoved + + " commited transactions with sequence lower than " + + minStartSeqNumber + ". Still have " + + commitedTransactionsBySequenceNumber.size() + " left"); + } else if (commitedTransactionsBySequenceNumber.size() > 0) { + LOG.debug("Could not remove any transactions, and still have " + + commitedTransactionsBySequenceNumber.size() + " left"); + } + } + + private Integer getMinStartSequenceNumber() { + Integer min = null; + for (TransactionState transactionState : transactionsById.values()) { + if (min == null || transactionState.getStartSequenceNumber() < min) { + min = transactionState.getStartSequenceNumber(); + } + } + return min; + } + + // TODO, resolve from the global transaction log + @SuppressWarnings("unused") + private void resolveTransactionFromLog(final long transactionId) { + throw new RuntimeException("Globaql transaction log is not Implemented"); + } + + private class TransactionLeaseListener implements LeaseListener { + private final String transactionName; + + TransactionLeaseListener(final String n) { + this.transactionName = n; + } + + /** {@inheritDoc} */ + public void leaseExpired() { + LOG.info("Transaction " + this.transactionName + " lease expired"); + TransactionState s = null; + synchronized (transactionsById) { + s = transactionsById.remove(transactionName); + } + if (s == null) { + LOG.warn("Unknown transaction expired " + this.transactionName); + return; + } + + switch (s.getStatus()) { + case PENDING: + s.setStatus(Status.ABORTED); // Other transactions may have a ref + break; + case COMMIT_PENDING: + LOG.info("Transaction " + s.getTransactionId() + + " expired in COMMIT_PENDING state"); + LOG.info("Checking transaction status in transaction log"); + resolveTransactionFromLog(s.getTransactionId()); + break; + default: + LOG.warn("Unexpected status on expired lease"); + } + } + } + + /** Wrapper which keeps track of rows returned by scanner. */ + private class ScannerWrapper implements InternalScanner { + private long transactionId; + private InternalScanner scanner; + + public ScannerWrapper(final long transactionId, + final InternalScanner scanner) { + this.transactionId = transactionId; + this.scanner = scanner; + } + + public void close() throws IOException { + scanner.close(); + } + + public boolean isMultipleMatchScanner() { + return scanner.isMultipleMatchScanner(); + } + + public boolean isWildcardScanner() { + return scanner.isWildcardScanner(); + } + + public boolean next(final HStoreKey key, + final SortedMap results) throws IOException { + boolean result = scanner.next(key, results); + TransactionState state = getTransactionState(transactionId); + state.setHasScan(true); + // FIXME, not using row, just claiming read over the whole region. We are + // being very conservative on scans to avoid phantom reads. + state.addRead(key.getRow()); + + if (result) { + Map localWrites = state.localGetFull(key.getRow(), null, + Integer.MAX_VALUE); + if (localWrites != null) { + LOG + .info("Scanning over row that has been writen to " + + transactionId); + for (Entry entry : localWrites.entrySet()) { + results.put(entry.getKey(), entry.getValue()); + } + } + } + + return result; + } + } +} diff --git a/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java b/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java new file mode 100644 index 00000000000..d77eb14edee --- /dev/null +++ b/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegionServer.java @@ -0,0 +1,296 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.transactional; + +import java.io.IOException; +import java.lang.Thread.UncaughtExceptionHandler; +import java.util.Arrays; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HServerAddress; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.RemoteExceptionHandler; +import org.apache.hadoop.hbase.filter.RowFilterInterface; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.io.HbaseMapWritable; +import org.apache.hadoop.hbase.io.RowResult; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.regionserver.InternalScanner; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.io.MapWritable; +import org.apache.hadoop.util.Progressable; + +/** + * RegionServer with support for transactions. Transactional logic is at the + * region level, so we mostly just delegate to the appropriate + * TransactionalRegion. + */ +public class TransactionalRegionServer extends HRegionServer implements + TransactionalRegionInterface { + static final Log LOG = LogFactory.getLog(TransactionalRegionServer.class); + + private final CleanOldTransactionsChore cleanOldTransactionsThread; + + public TransactionalRegionServer(final HBaseConfiguration conf) + throws IOException { + this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS, + DEFAULT_REGIONSERVER_ADDRESS)), conf); + } + + public TransactionalRegionServer(final HServerAddress address, + final HBaseConfiguration conf) throws IOException { + super(address, conf); + cleanOldTransactionsThread = new CleanOldTransactionsChore(this, + super.stopRequested); + } + + /** {@inheritDoc} */ + @Override + public long getProtocolVersion(final String protocol, final long clientVersion) + throws IOException { + if (protocol.equals(TransactionalRegionInterface.class.getName())) { + return TransactionalRegionInterface.versionID; + } + return super.getProtocolVersion(protocol, clientVersion); + } + + @Override + protected void init(final MapWritable c) throws IOException { + super.init(c); + String n = Thread.currentThread().getName(); + UncaughtExceptionHandler handler = new UncaughtExceptionHandler() { + public void uncaughtException(final Thread t, final Throwable e) { + abort(); + LOG.fatal("Set stop flag in " + t.getName(), e); + } + }; + Threads.setDaemonThreadRunning(this.cleanOldTransactionsThread, n + + ".oldTransactionCleaner", handler); + + } + + @Override + protected HRegion instantiateRegion(final HRegionInfo regionInfo) + throws IOException { + HRegion r = new TransactionalRegion(HTableDescriptor.getTableDir(super + .getRootDir(), regionInfo.getTableDesc().getName()), super.log, super + .getFileSystem(), super.conf, regionInfo, super.getFlushRequester()); + r.initialize(null, new Progressable() { + public void progress() { + addProcessingMessage(regionInfo); + } + }); + return r; + } + + protected TransactionalRegion getTransactionalRegion(final byte[] regionName) + throws NotServingRegionException { + return (TransactionalRegion) super.getRegion(regionName); + } + + public void abort(final byte[] regionName, final long transactionId) + throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + getTransactionalRegion(regionName).abort(transactionId); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public void batchUpdate(final long transactionId, final byte[] regionName, + final BatchUpdate b) throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + getTransactionalRegion(regionName).batchUpdate(transactionId, b); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public void commit(final byte[] regionName, final long transactionId) + throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + getTransactionalRegion(regionName).commit(transactionId); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public boolean commitRequest(final byte[] regionName, final long transactionId) + throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + return getTransactionalRegion(regionName).commitRequest(transactionId); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public Cell get(final long transactionId, final byte[] regionName, + final byte[] row, final byte[] column) throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + return getTransactionalRegion(regionName).get(transactionId, row, column); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public Cell[] get(final long transactionId, final byte[] regionName, + final byte[] row, final byte[] column, final int numVersions) + throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + return getTransactionalRegion(regionName).get(transactionId, row, column, + numVersions); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public Cell[] get(final long transactionId, final byte[] regionName, + final byte[] row, final byte[] column, final long timestamp, + final int numVersions) throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + return getTransactionalRegion(regionName).get(transactionId, row, column, + timestamp, numVersions); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public RowResult getRow(final long transactionId, final byte[] regionName, + final byte[] row, final long ts) throws IOException { + return getRow(transactionId, regionName, row, null, ts); + } + + public RowResult getRow(final long transactionId, final byte[] regionName, + final byte[] row, final byte[][] columns) throws IOException { + return getRow(transactionId, regionName, row, columns, + HConstants.LATEST_TIMESTAMP); + } + + public RowResult getRow(final long transactionId, final byte[] regionName, + final byte[] row, final byte[][] columns, final long ts) + throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + // convert the columns array into a set so it's easy to check later. + Set columnSet = null; + if (columns != null) { + columnSet = new TreeSet(Bytes.BYTES_COMPARATOR); + columnSet.addAll(Arrays.asList(columns)); + } + + TransactionalRegion region = getTransactionalRegion(regionName); + Map map = region.getFull(transactionId, row, columnSet, ts); + HbaseMapWritable result = new HbaseMapWritable(); + result.putAll(map); + return new RowResult(row, result); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + + } + + /** {@inheritDoc} */ + public void deleteAll(final long transactionId, final byte[] regionName, + final byte[] row, final long timestamp) throws IOException { + checkOpen(); + super.getRequestCount().incrementAndGet(); + try { + TransactionalRegion region = getTransactionalRegion(regionName); + region.deleteAll(transactionId, row, timestamp); + } catch (IOException e) { + checkFileSystem(); + throw e; + } + } + + public long openScanner(final long transactionId, final byte[] regionName, + final byte[][] cols, final byte[] firstRow, final long timestamp, + final RowFilterInterface filter) throws IOException { + checkOpen(); + NullPointerException npe = null; + if (regionName == null) { + npe = new NullPointerException("regionName is null"); + } else if (cols == null) { + npe = new NullPointerException("columns to scan is null"); + } else if (firstRow == null) { + npe = new NullPointerException("firstRow for scanner is null"); + } + if (npe != null) { + IOException io = new IOException("Invalid arguments to openScanner"); + io.initCause(npe); + throw io; + } + super.getRequestCount().incrementAndGet(); + try { + TransactionalRegion r = getTransactionalRegion(regionName); + long scannerId = -1L; + InternalScanner s = r.getScanner(transactionId, cols, firstRow, + timestamp, filter); + scannerId = super.addScanner(s); + return scannerId; + } catch (IOException e) { + LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")", + RemoteExceptionHandler.checkIOException(e)); + checkFileSystem(); + throw e; + } + } + + public void beginTransaction(final long transactionId, final byte[] regionName) + throws IOException { + getTransactionalRegion(regionName).beginTransaction(transactionId); + } + +} diff --git a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java index 561030354da..a770ea97130 100644 --- a/src/test/org/apache/hadoop/hbase/HBaseTestCase.java +++ b/src/test/org/apache/hadoop/hbase/HBaseTestCase.java @@ -167,9 +167,11 @@ public abstract class HBaseTestCase extends TestCase { protected HRegion openClosedRegion(final HRegion closedRegion) throws IOException { - return new HRegion(closedRegion.getBaseDir(), closedRegion.getLog(), - closedRegion.getFilesystem(), closedRegion.getConf(), - closedRegion.getRegionInfo(), null, null); + HRegion r = new HRegion(closedRegion.getBaseDir(), closedRegion.getLog(), + closedRegion.getFilesystem(), closedRegion.getConf(), + closedRegion.getRegionInfo(), null); + r.initialize(null, null); + return r; } /** diff --git a/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java b/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java new file mode 100644 index 00000000000..a7bf60f6c82 --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/client/transactional/StressTestTransactions.java @@ -0,0 +1,421 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.transactional; + +import java.io.IOException; +import java.util.LinkedList; +import java.util.List; +import java.util.Random; +import java.util.concurrent.atomic.AtomicBoolean; + +import junit.framework.Assert; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.HBaseClusterTestCase; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; +import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Stress Test the transaction functionality. This requires to run an + * {@link TransactionalRegionServer}. We run many threads doing reads/writes + * which may conflict with each other. We have two types of transactions, those + * which operate on rows of a single table, and those which operate on rows + * across multiple tables. Each transaction type has a modification operation + * which changes two values while maintaining the sum. Also each transaction + * type has a consistency-check operation which sums all rows and verifies that + * the sum is as expected. + */ +public class StressTestTransactions extends HBaseClusterTestCase { + private static final Log LOG = LogFactory + .getLog(StressTestTransactions.class); + + private static final int NUM_TABLES = 3; + private static final int NUM_ST_ROWS = 3; + private static final int NUM_MT_ROWS = 3; + private static final int NUM_TRANSACTIONS_PER_THREAD = 100; + private static final int NUM_SINGLE_TABLE_THREADS = 6; + private static final int NUM_MULTI_TABLE_THREADS = 6; + private static final int PRE_COMMIT_SLEEP = 10; + private static final Random RAND = new Random(); + + private static final byte[] FAMILY = Bytes.toBytes("family:"); + private static final byte[] COL = Bytes.toBytes("family:a"); + + private HBaseAdmin admin; + private TransactionalTable[] tables; + private TransactionManager transactionManager; + + /** constructor */ + public StressTestTransactions() { + conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class + .getName()); + conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class + .getName()); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + tables = new TransactionalTable[NUM_TABLES]; + + for (int i = 0; i < tables.length; i++) { + HTableDescriptor desc = new HTableDescriptor(makeTableName(i)); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin = new HBaseAdmin(conf); + admin.createTable(desc); + tables[i] = new TransactionalTable(conf, desc.getName()); + } + + transactionManager = new TransactionManager(conf); + } + + private String makeTableName(final int i) { + return "table" + i; + } + + private void writeInitalValues() throws IOException { + for (TransactionalTable table : tables) { + for (int i = 0; i < NUM_ST_ROWS; i++) { + byte[] row = makeSTRow(i); + BatchUpdate b = new BatchUpdate(row); + b.put(COL, Bytes.toBytes(SingleTableTransactionThread.INITIAL_VALUE)); + table.commit(b); + } + for (int i = 0; i < NUM_MT_ROWS; i++) { + byte[] row = makeMTRow(i); + BatchUpdate b = new BatchUpdate(row); + b.put(COL, Bytes.toBytes(MultiTableTransactionThread.INITIAL_VALUE)); + table.commit(b); + } + } + } + + private byte[] makeSTRow(final int i) { + return Bytes.toBytes("st" + i); + } + + private byte[] makeMTRow(final int i) { + return Bytes.toBytes("mt" + i); + } + + private static int nextThreadNum = 1; + private static final AtomicBoolean stopRequest = new AtomicBoolean(false); + private static final AtomicBoolean consistencyFailure = new AtomicBoolean( + false); + + // Thread which runs transactions + abstract class TransactionThread extends Thread { + private int numRuns = 0; + private int numAborts = 0; + private int numUnknowns = 0; + + public TransactionThread(final String namePrefix) { + super.setName(namePrefix + "transaction " + nextThreadNum++); + } + + @Override + public void run() { + for (int i = 0; i < NUM_TRANSACTIONS_PER_THREAD; i++) { + if (stopRequest.get()) { + return; + } + try { + numRuns++; + transaction(); + } catch (UnknownTransactionException e) { + numUnknowns++; + } catch (IOException e) { + throw new RuntimeException(e); + } catch (CommitUnsuccessfulException e) { + numAborts++; + } + } + } + + protected abstract void transaction() throws IOException, + CommitUnsuccessfulException; + + public int getNumAborts() { + return numAborts; + } + + public int getNumUnknowns() { + return numUnknowns; + } + + protected void preCommitSleep() { + try { + Thread.sleep(PRE_COMMIT_SLEEP); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + protected void consistencyFailure() { + LOG.fatal("Consistency failure"); + stopRequest.set(true); + consistencyFailure.set(true); + } + + /** + * Get the numRuns. + * + * @return Return the numRuns. + */ + public int getNumRuns() { + return numRuns; + } + + } + + // Atomically change the value of two rows rows while maintaining the sum. + // This should preserve the global sum of the rows, which is also checked + // with a transaction. + private class SingleTableTransactionThread extends TransactionThread { + private static final int INITIAL_VALUE = 10; + public static final int TOTAL_SUM = INITIAL_VALUE * NUM_ST_ROWS; + private static final int MAX_TRANSFER_AMT = 100; + + private TransactionalTable table; + boolean doCheck = false; + + public SingleTableTransactionThread() { + super("single table "); + } + + @Override + protected void transaction() throws IOException, + CommitUnsuccessfulException { + if (doCheck) { + checkTotalSum(); + } else { + doSingleRowChange(); + } + doCheck = !doCheck; + } + + private void doSingleRowChange() throws IOException, + CommitUnsuccessfulException { + table = tables[RAND.nextInt(NUM_TABLES)]; + int transferAmount = RAND.nextInt(MAX_TRANSFER_AMT * 2) + - MAX_TRANSFER_AMT; + int row1Index = RAND.nextInt(NUM_ST_ROWS); + int row2Index; + do { + row2Index = RAND.nextInt(NUM_ST_ROWS); + } while (row2Index == row1Index); + byte[] row1 = makeSTRow(row1Index); + byte[] row2 = makeSTRow(row2Index); + + TransactionState transactionState = transactionManager.beginTransaction(); + int row1Amount = Bytes.toInt(table.get(transactionState, row1, COL) + .getValue()); + int row2Amount = Bytes.toInt(table.get(transactionState, row2, COL) + .getValue()); + + row1Amount -= transferAmount; + row2Amount += transferAmount; + + BatchUpdate update = new BatchUpdate(row1); + update.put(COL, Bytes.toBytes(row1Amount)); + table.commit(transactionState, update); + update = new BatchUpdate(row2); + update.put(COL, Bytes.toBytes(row2Amount)); + table.commit(transactionState, update); + + super.preCommitSleep(); + + transactionManager.tryCommit(transactionState); + LOG.debug("Commited"); + } + + // Check the table we last mutated + private void checkTotalSum() throws IOException, + CommitUnsuccessfulException { + TransactionState transactionState = transactionManager.beginTransaction(); + int totalSum = 0; + for (int i = 0; i < NUM_ST_ROWS; i++) { + totalSum += Bytes.toInt(table.get(transactionState, makeSTRow(i), COL) + .getValue()); + } + + transactionManager.tryCommit(transactionState); + if (TOTAL_SUM != totalSum) { + super.consistencyFailure(); + } + } + + } + + // Similar to SingleTable, but this time we maintain consistency across tables + // rather than rows + private class MultiTableTransactionThread extends TransactionThread { + private static final int INITIAL_VALUE = 1000; + public static final int TOTAL_SUM = INITIAL_VALUE * NUM_TABLES; + private static final int MAX_TRANSFER_AMT = 100; + + private byte[] row; + boolean doCheck = false; + + public MultiTableTransactionThread() { + super("multi table"); + } + + @Override + protected void transaction() throws IOException, + CommitUnsuccessfulException { + if (doCheck) { + checkTotalSum(); + } else { + doSingleRowChange(); + } + doCheck = !doCheck; + } + + private void doSingleRowChange() throws IOException, + CommitUnsuccessfulException { + row = makeMTRow(RAND.nextInt(NUM_MT_ROWS)); + int transferAmount = RAND.nextInt(MAX_TRANSFER_AMT * 2) + - MAX_TRANSFER_AMT; + int table1Index = RAND.nextInt(tables.length); + int table2Index; + do { + table2Index = RAND.nextInt(tables.length); + } while (table2Index == table1Index); + + TransactionalTable table1 = tables[table1Index]; + TransactionalTable table2 = tables[table2Index]; + + TransactionState transactionState = transactionManager.beginTransaction(); + int table1Amount = Bytes.toInt(table1.get(transactionState, row, COL) + .getValue()); + int table2Amount = Bytes.toInt(table2.get(transactionState, row, COL) + .getValue()); + + table1Amount -= transferAmount; + table2Amount += transferAmount; + + BatchUpdate update = new BatchUpdate(row); + update.put(COL, Bytes.toBytes(table1Amount)); + table1.commit(transactionState, update); + + update = new BatchUpdate(row); + update.put(COL, Bytes.toBytes(table2Amount)); + table2.commit(transactionState, update); + + super.preCommitSleep(); + + transactionManager.tryCommit(transactionState); + + LOG.trace(Bytes.toString(table1.getTableName()) + ": " + table1Amount); + LOG.trace(Bytes.toString(table2.getTableName()) + ": " + table2Amount); + + } + + private void checkTotalSum() throws IOException, + CommitUnsuccessfulException { + TransactionState transactionState = transactionManager.beginTransaction(); + int totalSum = 0; + int[] amounts = new int[tables.length]; + for (int i = 0; i < tables.length; i++) { + int amount = Bytes.toInt(tables[i].get(transactionState, row, COL) + .getValue()); + amounts[i] = amount; + totalSum += amount; + } + + transactionManager.tryCommit(transactionState); + + for (int i = 0; i < tables.length; i++) { + LOG.trace(Bytes.toString(tables[i].getTableName()) + ": " + amounts[i]); + } + + if (TOTAL_SUM != totalSum) { + super.consistencyFailure(); + } + } + + } + + public void testStressTransactions() throws IOException, InterruptedException { + writeInitalValues(); + + List transactionThreads = new LinkedList(); + + for (int i = 0; i < NUM_SINGLE_TABLE_THREADS; i++) { + TransactionThread transactionThread = new SingleTableTransactionThread(); + transactionThread.start(); + transactionThreads.add(transactionThread); + } + + for (int i = 0; i < NUM_MULTI_TABLE_THREADS; i++) { + TransactionThread transactionThread = new MultiTableTransactionThread(); + transactionThread.start(); + transactionThreads.add(transactionThread); + } + + for (TransactionThread transactionThread : transactionThreads) { + transactionThread.join(); + } + + for (TransactionThread transactionThread : transactionThreads) { + LOG.info(transactionThread.getName() + " done with " + + transactionThread.getNumAborts() + " aborts, and " + + transactionThread.getNumUnknowns() + " unknown transactions of " + + transactionThread.getNumRuns()); + } + + doFinalConsistencyChecks(); + } + + private void doFinalConsistencyChecks() throws IOException { + + int[] mtSums = new int[NUM_MT_ROWS]; + for (int i = 0; i < mtSums.length; i++) { + mtSums[i] = 0; + } + + for (TransactionalTable table : tables) { + int thisTableSum = 0; + for (int i = 0; i < NUM_ST_ROWS; i++) { + byte[] row = makeSTRow(i); + thisTableSum += Bytes.toInt(table.get(row, COL).getValue()); + } + Assert.assertEquals(SingleTableTransactionThread.TOTAL_SUM, thisTableSum); + + for (int i = 0; i < NUM_MT_ROWS; i++) { + byte[] row = makeMTRow(i); + mtSums[i] += Bytes.toInt(table.get(row, COL).getValue()); + } + } + + for (int mtSum : mtSums) { + Assert.assertEquals(MultiTableTransactionThread.TOTAL_SUM, mtSum); + } + } +} diff --git a/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java b/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java new file mode 100644 index 00000000000..f9bd912e722 --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/client/transactional/TestTransactions.java @@ -0,0 +1,143 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client.transactional; + +import java.io.IOException; + +import org.apache.hadoop.hbase.HBaseClusterTestCase; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.io.Cell; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; +import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Test the transaction functionality. This requires to run an + * {@link TransactionalRegionServer}. + */ +public class TestTransactions extends HBaseClusterTestCase { + + private static final String TABLE_NAME = "table1"; + + private static final byte[] FAMILY = Bytes.toBytes("family:"); + private static final byte[] COL_A = Bytes.toBytes("family:a"); + + private static final byte[] ROW1 = Bytes.toBytes("row1"); + private static final byte[] ROW2 = Bytes.toBytes("row2"); + private static final byte[] ROW3 = Bytes.toBytes("row3"); + + private HBaseAdmin admin; + private TransactionalTable table; + private TransactionManager transactionManager; + + /** constructor */ + public TestTransactions() { + conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class + .getName()); + conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class + .getName()); + } + + @Override + protected void setUp() throws Exception { + super.setUp(); + + HTableDescriptor desc = new HTableDescriptor(TABLE_NAME); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin = new HBaseAdmin(conf); + admin.createTable(desc); + table = new TransactionalTable(conf, desc.getName()); + + transactionManager = new TransactionManager(conf); + writeInitalRow(); + } + + private void writeInitalRow() throws IOException { + BatchUpdate update = new BatchUpdate(ROW1); + update.put(COL_A, Bytes.toBytes(1)); + table.commit(update); + } + + public void testSimpleTransaction() throws IOException, + CommitUnsuccessfulException { + TransactionState transactionState = makeTransaction1(); + transactionManager.tryCommit(transactionState); + } + + public void testTwoTransactionsWithoutConflict() throws IOException, + CommitUnsuccessfulException { + TransactionState transactionState1 = makeTransaction1(); + TransactionState transactionState2 = makeTransaction2(); + + transactionManager.tryCommit(transactionState1); + transactionManager.tryCommit(transactionState2); + } + + public void testTwoTransactionsWithConflict() throws IOException, + CommitUnsuccessfulException { + TransactionState transactionState1 = makeTransaction1(); + TransactionState transactionState2 = makeTransaction2(); + + transactionManager.tryCommit(transactionState2); + + try { + transactionManager.tryCommit(transactionState1); + fail(); + } catch (CommitUnsuccessfulException e) { + // Good + } + } + + // Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA + private TransactionState makeTransaction1() throws IOException { + TransactionState transactionState = transactionManager.beginTransaction(); + + Cell row1_A = table.get(transactionState, ROW1, COL_A); + + BatchUpdate write1 = new BatchUpdate(ROW2); + write1.put(COL_A, row1_A.getValue()); + table.commit(transactionState, write1); + + BatchUpdate write2 = new BatchUpdate(ROW3); + write2.put(COL_A, row1_A.getValue()); + table.commit(transactionState, write2); + + return transactionState; + } + + // Read ROW1,COL_A, increment its (integer) value, write back + private TransactionState makeTransaction2() throws IOException { + TransactionState transactionState = transactionManager.beginTransaction(); + + Cell row1_A = table.get(transactionState, ROW1, COL_A); + + int value = Bytes.toInt(row1_A.getValue()); + + BatchUpdate write = new BatchUpdate(ROW1); + write.put(COL_A, Bytes.toBytes(value + 1)); + table.commit(transactionState, write); + + return transactionState; + } +} diff --git a/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestHLogRecovery.java b/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestHLogRecovery.java new file mode 100644 index 00000000000..fb2c213c6b2 --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestHLogRecovery.java @@ -0,0 +1,285 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.transactional; + +import java.io.IOException; +import java.util.Collection; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseClusterTestCase; +import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.LocalHBaseCluster; +import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.client.HTable; +import org.apache.hadoop.hbase.client.Scanner; +import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException; +import org.apache.hadoop.hbase.client.transactional.TransactionManager; +import org.apache.hadoop.hbase.client.transactional.TransactionState; +import org.apache.hadoop.hbase.client.transactional.TransactionalTable; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.HRegionServer; +import org.apache.hadoop.hbase.util.Bytes; + +public class TestHLogRecovery extends HBaseClusterTestCase { + private static final Log LOG = LogFactory.getLog(TestHLogRecovery.class); + + private static final String TABLE_NAME = "table1"; + + private static final byte[] FAMILY = Bytes.toBytes("family:"); + private static final byte[] COL_A = Bytes.toBytes("family:a"); + + private static final byte[] ROW1 = Bytes.toBytes("row1"); + private static final byte[] ROW2 = Bytes.toBytes("row2"); + private static final byte[] ROW3 = Bytes.toBytes("row3"); + private static final int TOTAL_VALUE = 10; + + private HBaseAdmin admin; + private TransactionManager transactionManager; + private TransactionalTable table; + + /** constructor */ + public TestHLogRecovery() { + super(2, false); + + conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class + .getName()); + conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class + .getName()); + + // Set flush params so we don't get any + // FIXME (defaults are probably fine) + + // Copied from TestRegionServerExit + conf.setInt("ipc.client.connect.max.retries", 5); // reduce ipc retries + conf.setInt("ipc.client.timeout", 10000); // and ipc timeout + conf.setInt("hbase.client.pause", 10000); // increase client timeout + conf.setInt("hbase.client.retries.number", 10); // increase HBase retries + } + + @Override + protected void setUp() throws Exception { + FileSystem.getLocal(conf).delete(new Path(conf.get(HConstants.HBASE_DIR)), true); + super.setUp(); + + HTableDescriptor desc = new HTableDescriptor(TABLE_NAME); + desc.addFamily(new HColumnDescriptor(FAMILY)); + admin = new HBaseAdmin(conf); + admin.createTable(desc); + table = new TransactionalTable(conf, desc.getName()); + + transactionManager = new TransactionManager(conf); + writeInitalRows(); + } + + private void writeInitalRows() throws IOException { + BatchUpdate update = new BatchUpdate(ROW1); + update.put(COL_A, Bytes.toBytes(TOTAL_VALUE)); + table.commit(update); + update = new BatchUpdate(ROW2); + update.put(COL_A, Bytes.toBytes(0)); + table.commit(update); + update = new BatchUpdate(ROW3); + update.put(COL_A, Bytes.toBytes(0)); + table.commit(update); + } + + public void testWithoutFlush() throws IOException, + CommitUnsuccessfulException { + writeInitalRows(); + TransactionState state1 = makeTransaction(false); + transactionManager.tryCommit(state1); + stopOrAbortRegionServer(true); + + Thread t = startVerificationThread(1); + t.start(); + threadDumpingJoin(t); + } + + public void testWithFlushBeforeCommit() throws IOException, + CommitUnsuccessfulException { + writeInitalRows(); + TransactionState state1 = makeTransaction(false); + flushRegionServer(); + transactionManager.tryCommit(state1); + stopOrAbortRegionServer(true); + + Thread t = startVerificationThread(1); + t.start(); + threadDumpingJoin(t); + } + + // FIXME, TODO + // public void testWithFlushBetweenTransactionWrites() { + // fail(); + // } + + private void flushRegionServer() { + List regionThreads = cluster + .getRegionThreads(); + + HRegion region = null; + int server = -1; + for (int i = 0; i < regionThreads.size() && server == -1; i++) { + HRegionServer s = regionThreads.get(i).getRegionServer(); + Collection regions = s.getOnlineRegions(); + for (HRegion r : regions) { + if (Bytes.equals(r.getTableDesc().getName(), Bytes.toBytes(TABLE_NAME))) { + server = i; + region = r; + } + } + } + if (server == -1) { + LOG.fatal("could not find region server serving table region"); + fail(); + } + ((TransactionalRegionServer) regionThreads.get(server).getRegionServer()) + .getFlushRequester().request(region); + } + + /** + * Stop the region server serving TABLE_NAME. + * + * @param abort set to true if region server should be aborted, if false it is + * just shut down. + */ + private void stopOrAbortRegionServer(final boolean abort) { + List regionThreads = cluster + .getRegionThreads(); + + int server = -1; + for (int i = 0; i < regionThreads.size(); i++) { + HRegionServer s = regionThreads.get(i).getRegionServer(); + Collection regions = s.getOnlineRegions(); + LOG.info("server: " + regionThreads.get(i).getName()); + for (HRegion r : regions) { + LOG.info("region: " + r.getRegionInfo().getRegionNameAsString()); + if (Bytes.equals(r.getTableDesc().getName(), Bytes.toBytes(TABLE_NAME))) { + server = i; + } + } + } + if (server == -1) { + LOG.fatal("could not find region server serving table region"); + fail(); + } + if (abort) { + this.cluster.abortRegionServer(server); + + } else { + this.cluster.stopRegionServer(server); + } + LOG.info(this.cluster.waitOnRegionServer(server) + " has been " + + (abort ? "aborted" : "shut down")); + } + + private void verify(final int numRuns) throws IOException { + // Reads + int row1 = Bytes.toInt(table.get(ROW1, COL_A).getValue()); + int row2 = Bytes.toInt(table.get(ROW2, COL_A).getValue()); + int row3 = Bytes.toInt(table.get(ROW3, COL_A).getValue()); + + assertEquals(TOTAL_VALUE - 2 * numRuns, row1); + assertEquals(numRuns, row2); + assertEquals(numRuns, row3); + } + + // Move 2 out of ROW1 and 1 into ROW2 and 1 into ROW3 + private TransactionState makeTransaction(final boolean flushMidWay) + throws IOException { + TransactionState transactionState = transactionManager.beginTransaction(); + + // Reads + int row1 = Bytes.toInt(table.get(transactionState, ROW1, COL_A).getValue()); + int row2 = Bytes.toInt(table.get(transactionState, ROW2, COL_A).getValue()); + int row3 = Bytes.toInt(table.get(transactionState, ROW3, COL_A).getValue()); + + row1 -= 2; + row2 += 1; + row3 += 1; + + if (flushMidWay) { + flushRegionServer(); + } + + // Writes + BatchUpdate write = new BatchUpdate(ROW1); + write.put(COL_A, Bytes.toBytes(row1)); + table.commit(transactionState, write); + + write = new BatchUpdate(ROW2); + write.put(COL_A, Bytes.toBytes(row2)); + table.commit(transactionState, write); + + write = new BatchUpdate(ROW3); + write.put(COL_A, Bytes.toBytes(row3)); + table.commit(transactionState, write); + + return transactionState; + } + + /* + * Run verification in a thread so I can concurrently run a thread-dumper + * while we're waiting (because in this test sometimes the meta scanner looks + * to be be stuck). @param tableName Name of table to find. @param row Row we + * expect to find. @return Verification thread. Caller needs to calls start on + * it. + */ + private Thread startVerificationThread(final int numRuns) { + Runnable runnable = new Runnable() { + public void run() { + try { + // Now try to open a scanner on the meta table. Should stall until + // meta server comes back up. + HTable t = new HTable(conf, TABLE_NAME); + Scanner s = t.getScanner(new byte[][] { COL_A }, + HConstants.EMPTY_START_ROW); + s.close(); + + } catch (IOException e) { + LOG.fatal("could not re-open meta table because", e); + fail(); + } + Scanner scanner = null; + try { + verify(numRuns); + LOG.info("Success!"); + } catch (Exception e) { + e.printStackTrace(); + fail(); + } finally { + if (scanner != null) { + LOG.info("Closing scanner " + scanner); + scanner.close(); + } + } + } + }; + return new Thread(runnable); + } +} diff --git a/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTransactionalHLogManager.java b/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTransactionalHLogManager.java new file mode 100644 index 00000000000..7278647f407 --- /dev/null +++ b/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTransactionalHLogManager.java @@ -0,0 +1,310 @@ +/** + * Copyright 2008 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver.transactional; + +import java.io.IOException; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.dfs.MiniDFSCluster; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseTestCase; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HTableDescriptor; +import org.apache.hadoop.hbase.io.BatchUpdate; +import org.apache.hadoop.hbase.regionserver.HLog; +import org.apache.hadoop.hbase.util.Bytes; + +/** JUnit test case for HLog */ +public class TestTransactionalHLogManager extends HBaseTestCase implements + HConstants { + private Path dir; + private MiniDFSCluster cluster; + + final byte[] tableName = Bytes.toBytes("tablename"); + final HTableDescriptor tableDesc = new HTableDescriptor(tableName); + final HRegionInfo regionInfo = new HRegionInfo(tableDesc, + HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW); + final byte[] row1 = Bytes.toBytes("row1"); + final byte[] val1 = Bytes.toBytes("val1"); + final byte[] row2 = Bytes.toBytes("row2"); + final byte[] val2 = Bytes.toBytes("val2"); + final byte[] row3 = Bytes.toBytes("row3"); + final byte[] val3 = Bytes.toBytes("val3"); + final byte[] col = Bytes.toBytes("col:A"); + + /** {@inheritDoc} */ + @Override + public void setUp() throws Exception { + cluster = new MiniDFSCluster(conf, 2, true, (String[]) null); + // Set the hbase.rootdir to be the home directory in mini dfs. + this.conf.set(HConstants.HBASE_DIR, this.cluster.getFileSystem() + .getHomeDirectory().toString()); + super.setUp(); + this.dir = new Path("/hbase", getName()); + if (fs.exists(dir)) { + fs.delete(dir, true); + } + } + + /** {@inheritDoc} */ + @Override + public void tearDown() throws Exception { + if (this.fs.exists(this.dir)) { + this.fs.delete(this.dir, true); + } + shutdownDfs(cluster); + super.tearDown(); + } + + /** + * @throws IOException + */ + public void testSingleCommit() throws IOException { + + HLog log = new HLog(fs, dir, this.conf, null); + TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs, + regionInfo, conf); + + // Write columns named 1, 2, 3, etc. and then values of single byte + // 1, 2, 3... + long transactionId = 1; + logMangaer.writeStartToLog(transactionId); + + BatchUpdate update1 = new BatchUpdate(row1); + update1.put(col, val1); + logMangaer.writeUpdateToLog(transactionId, update1); + + BatchUpdate update2 = new BatchUpdate(row2); + update2.put(col, val2); + logMangaer.writeUpdateToLog(transactionId, update2); + + BatchUpdate update3 = new BatchUpdate(row3); + update3.put(col, val3); + logMangaer.writeUpdateToLog(transactionId, update3); + + logMangaer.writeCommitToLog(transactionId); + + // log.completeCacheFlush(regionName, tableName, logSeqId); + + log.close(); + Path filename = log.computeFilename(log.getFilenum()); + + Map> commits = logMangaer.getCommitsFromLog( + filename, -1, null); + + assertEquals(1, commits.size()); + assertTrue(commits.containsKey(transactionId)); + assertEquals(3, commits.get(transactionId).size()); + + List updates = commits.get(transactionId); + + update1 = updates.get(0); + assertTrue(Bytes.equals(row1, update1.getRow())); + assertTrue(Bytes.equals(val1, update1.iterator().next().getValue())); + + update2 = updates.get(1); + assertTrue(Bytes.equals(row2, update2.getRow())); + assertTrue(Bytes.equals(val2, update2.iterator().next().getValue())); + + update3 = updates.get(2); + assertTrue(Bytes.equals(row3, update3.getRow())); + assertTrue(Bytes.equals(val3, update3.iterator().next().getValue())); + + } + + /** + * @throws IOException + */ + public void testSingleAbort() throws IOException { + + HLog log = new HLog(fs, dir, this.conf, null); + TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs, + regionInfo, conf); + + long transactionId = 1; + logMangaer.writeStartToLog(transactionId); + + BatchUpdate update1 = new BatchUpdate(row1); + update1.put(col, val1); + logMangaer.writeUpdateToLog(transactionId, update1); + + BatchUpdate update2 = new BatchUpdate(row2); + update2.put(col, val2); + logMangaer.writeUpdateToLog(transactionId, update2); + + BatchUpdate update3 = new BatchUpdate(row3); + update3.put(col, val3); + logMangaer.writeUpdateToLog(transactionId, update3); + + logMangaer.writeAbortToLog(transactionId); + + // log.completeCacheFlush(regionName, tableName, logSeqId); + + log.close(); + Path filename = log.computeFilename(log.getFilenum()); + + Map> commits = logMangaer.getCommitsFromLog( + filename, -1, null); + + assertEquals(0, commits.size()); + } + + /** + * @throws IOException + */ + public void testInterlievedCommits() throws IOException { + + HLog log = new HLog(fs, dir, this.conf, null); + TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs, + regionInfo, conf); + + long transaction1Id = 1; + long transaction2Id = 2; + logMangaer.writeStartToLog(transaction1Id); + + BatchUpdate update1 = new BatchUpdate(row1); + update1.put(col, val1); + logMangaer.writeUpdateToLog(transaction1Id, update1); + + logMangaer.writeStartToLog(transaction2Id); + + BatchUpdate update2 = new BatchUpdate(row2); + update2.put(col, val2); + logMangaer.writeUpdateToLog(transaction2Id, update2); + + BatchUpdate update3 = new BatchUpdate(row3); + update3.put(col, val3); + logMangaer.writeUpdateToLog(transaction1Id, update3); + + logMangaer.writeCommitToLog(transaction2Id); + logMangaer.writeCommitToLog(transaction1Id); + + // log.completeCacheFlush(regionName, tableName, logSeqId); + + log.close(); + Path filename = log.computeFilename(log.getFilenum()); + + Map> commits = logMangaer.getCommitsFromLog( + filename, -1, null); + + assertEquals(2, commits.size()); + assertEquals(2, commits.get(transaction1Id).size()); + assertEquals(1, commits.get(transaction2Id).size()); + } + + /** + * @throws IOException + */ + public void testInterlievedAbortCommit() throws IOException { + + HLog log = new HLog(fs, dir, this.conf, null); + TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs, + regionInfo, conf); + + long transaction1Id = 1; + long transaction2Id = 2; + logMangaer.writeStartToLog(transaction1Id); + + BatchUpdate update1 = new BatchUpdate(row1); + update1.put(col, val1); + logMangaer.writeUpdateToLog(transaction1Id, update1); + + logMangaer.writeStartToLog(transaction2Id); + + BatchUpdate update2 = new BatchUpdate(row2); + update2.put(col, val2); + logMangaer.writeUpdateToLog(transaction2Id, update2); + + logMangaer.writeAbortToLog(transaction2Id); + + BatchUpdate update3 = new BatchUpdate(row3); + update3.put(col, val3); + logMangaer.writeUpdateToLog(transaction1Id, update3); + + logMangaer.writeCommitToLog(transaction1Id); + + // log.completeCacheFlush(regionName, tableName, logSeqId); + + log.close(); + Path filename = log.computeFilename(log.getFilenum()); + + Map> commits = logMangaer.getCommitsFromLog( + filename, -1, null); + + assertEquals(1, commits.size()); + assertEquals(2, commits.get(transaction1Id).size()); + } + + /** + * @throws IOException + */ + public void testInterlievedCommitAbort() throws IOException { + + HLog log = new HLog(fs, dir, this.conf, null); + TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs, + regionInfo, conf); + + long transaction1Id = 1; + long transaction2Id = 2; + logMangaer.writeStartToLog(transaction1Id); + + BatchUpdate update1 = new BatchUpdate(row1); + update1.put(col, val1); + logMangaer.writeUpdateToLog(transaction1Id, update1); + + logMangaer.writeStartToLog(transaction2Id); + + BatchUpdate update2 = new BatchUpdate(row2); + update2.put(col, val2); + logMangaer.writeUpdateToLog(transaction2Id, update2); + + logMangaer.writeCommitToLog(transaction2Id); + + BatchUpdate update3 = new BatchUpdate(row3); + update3.put(col, val3); + logMangaer.writeUpdateToLog(transaction1Id, update3); + + logMangaer.writeAbortToLog(transaction1Id); + + // log.completeCacheFlush(regionName, tableName, logSeqId); + + log.close(); + Path filename = log.computeFilename(log.getFilenum()); + + Map> commits = logMangaer.getCommitsFromLog( + filename, -1, null); + + assertEquals(1, commits.size()); + assertEquals(1, commits.get(transaction2Id).size()); + } + + // FIXME Cannot do this test without a global transacton manager + // public void testMissingCommit() { + // fail(); + // } + + // FIXME Cannot do this test without a global transacton manager + // public void testMissingAbort() { + // fail(); + // } + +}