HBASE-669 MultiRegion transactions with Optimistic Concurrency Control

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@686650 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Michael Stack 2008-08-17 22:03:42 +00:00
parent e93a9b25b6
commit 462fe1e5dc
30 changed files with 4145 additions and 105 deletions

View File

@ -49,6 +49,8 @@ Release 0.3.0 - Unreleased
were present in previous versions of the patches for this issue,
but not in the version that was committed. Also fix a number of
compilation problems that were introduced by patch.
HBASE-669 MultiRegion transactions with Optimistic Concurrency Control
(Clint Morgan via Stack)
OPTIMIZATIONS

View File

@ -150,12 +150,13 @@ class HMerge implements HConstants {
for (int i = 0; i < info.length - 1; i++) {
if (currentRegion == null) {
currentRegion =
new HRegion(tabledir, hlog, fs, conf, info[i], null, null);
new HRegion(tabledir, hlog, fs, conf, info[i], null);
currentRegion.initialize(null, null);
currentSize = currentRegion.getLargestHStoreSize();
}
nextRegion =
new HRegion(tabledir, hlog, fs, conf, info[i + 1], null, null);
new HRegion(tabledir, hlog, fs, conf, info[i + 1], null);
nextRegion.initialize(null, null);
nextSize = nextRegion.getLargestHStoreSize();
if ((currentSize + nextSize) <= (maxFilesize / 2)) {
@ -322,7 +323,8 @@ class HMerge implements HConstants {
// Scan root region to find all the meta regions
root = new HRegion(rootTableDir, hlog, fs, conf,
HRegionInfo.ROOT_REGIONINFO, null, null);
HRegionInfo.ROOT_REGIONINFO, null);
root.initialize(null, null);
InternalScanner rootScanner =
root.getScanner(COL_REGIONINFO_ARRAY, HConstants.EMPTY_START_ROW,

View File

@ -982,7 +982,9 @@ public class HTable {
public Scanner getScanner(final byte [][] columns,
final byte [] startRow, long timestamp, RowFilterInterface filter)
throws IOException {
return new ClientScanner(columns, startRow, timestamp, filter);
ClientScanner s = new ClientScanner(columns, startRow, timestamp, filter);
s.initialize();
return s;
}
/**
@ -1335,15 +1337,13 @@ public class HTable {
protected RowFilterInterface filter;
protected ClientScanner(final Text [] columns, final Text startRow,
long timestamp, RowFilterInterface filter)
throws IOException {
long timestamp, RowFilterInterface filter) {
this(Bytes.toByteArrays(columns), startRow.getBytes(), timestamp,
filter);
}
protected ClientScanner(final byte[][] columns, final byte [] startRow,
final long timestamp, final RowFilterInterface filter)
throws IOException {
final long timestamp, final RowFilterInterface filter) {
if (CLIENT_LOG.isDebugEnabled()) {
CLIENT_LOG.debug("Creating scanner over " + Bytes.toString(getTableName()) +
" starting at key '" + Bytes.toString(startRow) + "'");
@ -1359,6 +1359,9 @@ public class HTable {
if (filter != null) {
filter.validate(columns);
}
}
public void initialize() throws IOException {
nextScanner();
}

View File

@ -60,11 +60,17 @@ public abstract class ServerCallable<T> implements Callable<T> {
/** @return the server name */
public String getServerName() {
if (location == null) {
return null;
}
return location.getServerAddress().toString();
}
/** @return the region name */
public byte [] getRegionName() {
public byte[] getRegionName() {
if (location == null) {
return null;
}
return location.getRegionInfo().getRegionName();
}

View File

@ -0,0 +1,43 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.transactional;
/** Thrown when a transaction cannot be committed.
*
*/
public class CommitUnsuccessfulException extends Exception {
public CommitUnsuccessfulException() {
super();
}
public CommitUnsuccessfulException(String arg0, Throwable arg1) {
super(arg0, arg1);
}
public CommitUnsuccessfulException(String arg0) {
super(arg0);
}
public CommitUnsuccessfulException(Throwable arg0) {
super(arg0);
}
}

View File

@ -0,0 +1,66 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.transactional;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
/**
* A local, in-memory implementation of the transaction logger. Does not provide a global view, so
* it can't be relighed on by
*
*/
public class LocalTransactionLogger implements TransactionLogger {
private static LocalTransactionLogger instance;
public synchronized static LocalTransactionLogger getInstance() {
if (instance == null) {
instance = new LocalTransactionLogger();
}
return instance;
}
private Random random = new Random();
private Map<Long, TransactionStatus> transactionIdToStatusMap = Collections
.synchronizedMap(new HashMap<Long, TransactionStatus>());
private LocalTransactionLogger() {
// Enforce singlton
}
// Gives back random longs to minimize possibility of collision
public long createNewTransactionLog() {
long id = random.nextLong();
transactionIdToStatusMap.put(id, TransactionStatus.PENDING);
return id;
}
public TransactionStatus getStatusForTransaction(final long transactionId) {
return transactionIdToStatusMap.get(transactionId);
}
public void setStatusForTransaction(final long transactionId,
final TransactionStatus status) {
transactionIdToStatusMap.put(transactionId, status);
}
}

View File

@ -0,0 +1,45 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.transactional;
/**
* Simple interface used to provide a log about transaction status. Written to
* by the client, and read by regionservers in case of failure.
*
*/
public interface TransactionLogger {
enum TransactionStatus {
PENDING, COMMITTED, ABORTED
}
/**
* Create a new transaction log. Return the transaction's globally unique id.
* Log's initial value should be PENDING
*
* @return transaction id
*/
long createNewTransactionLog();
TransactionStatus getStatusForTransaction(long transactionId);
void setStatusForTransaction(long transactionId, TransactionStatus status);
}

View File

@ -0,0 +1,145 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.transactional;
import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
import org.apache.hadoop.ipc.RemoteException;
/**
* Transaction Manager. Responsible for committing transactions.
*
*/
public class TransactionManager {
static final Log LOG = LogFactory.getLog(TransactionManager.class);
private final HConnection connection;
private final TransactionLogger transactionLogger;
public TransactionManager(final HBaseConfiguration conf) {
this(LocalTransactionLogger.getInstance(), conf);
}
public TransactionManager(final TransactionLogger transactionLogger,
final HBaseConfiguration conf) {
this.transactionLogger = transactionLogger;
connection = HConnectionManager.getConnection(conf);
}
/**
* Called to start a transaction.
*
* @return new transaction state
*/
public TransactionState beginTransaction() {
long transactionId = transactionLogger.createNewTransactionLog();
LOG.debug("Begining transaction " + transactionId);
return new TransactionState(transactionId);
}
/**
* Try and commit a transaction.
*
* @param transactionState
* @return
* @throws IOException
*/
public void tryCommit(final TransactionState transactionState)
throws CommitUnsuccessfulException, IOException {
LOG.debug("atempting to commit trasaction: " + transactionState.toString());
try {
for (HRegionLocation location : transactionState
.getParticipatingRegions()) {
TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
.getHRegionConnection(location.getServerAddress());
boolean canCommit = transactionalRegionServer.commitRequest(location
.getRegionInfo().getRegionName(), transactionState
.getTransactionId());
if (LOG.isTraceEnabled()) {
LOG.trace("Region ["
+ location.getRegionInfo().getRegionNameAsString() + "] votes "
+ (canCommit ? "to commit" : "to abort") + " transaction "
+ transactionState.getTransactionId());
}
if (!canCommit) {
LOG.debug("Aborting [" + transactionState.getTransactionId() + "]");
abort(transactionState, location);
throw new CommitUnsuccessfulException();
}
}
LOG.debug("Commiting [" + transactionState.getTransactionId() + "]");
transactionLogger.setStatusForTransaction(transactionState
.getTransactionId(), TransactionLogger.TransactionStatus.COMMITTED);
for (HRegionLocation location : transactionState
.getParticipatingRegions()) {
TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
.getHRegionConnection(location.getServerAddress());
transactionalRegionServer.commit(location.getRegionInfo()
.getRegionName(), transactionState.getTransactionId());
}
} catch (RemoteException e) {
LOG.debug("Commit of transaction [" + transactionState.getTransactionId()
+ "] was unsucsessful", e);
// FIXME, think about the what ifs
throw new CommitUnsuccessfulException(e);
}
// Tran log can be deleted now ...
}
/**
* Abort a s transaction.
*
* @param transactionState
* @throws IOException
*/
public void abort(final TransactionState transactionState) throws IOException {
abort(transactionState, null);
}
private void abort(final TransactionState transactionState,
final HRegionLocation locationToIgnore) throws IOException {
transactionLogger.setStatusForTransaction(transactionState
.getTransactionId(), TransactionLogger.TransactionStatus.ABORTED);
for (HRegionLocation location : transactionState.getParticipatingRegions()) {
if (locationToIgnore != null && location.equals(locationToIgnore)) {
continue;
}
TransactionalRegionInterface transactionalRegionServer = (TransactionalRegionInterface) connection
.getHRegionConnection(location.getServerAddress());
transactionalRegionServer.abort(location.getRegionInfo().getRegionName(),
transactionState.getTransactionId());
}
}
}

View File

@ -0,0 +1,51 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.transactional;
import java.io.IOException;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
class TransactionScannerCallable extends ScannerCallable {
private TransactionState transactionState;
TransactionScannerCallable(final TransactionState transactionState,
final HConnection connection, final byte[] tableName,
final byte[][] columns, final byte[] startRow, final long timestamp,
final RowFilterInterface filter) {
super(connection, tableName, columns, startRow, timestamp, filter);
this.transactionState = transactionState;
}
@Override
protected long openScanner() throws IOException {
if (transactionState.addRegion(location)) {
((TransactionalRegionInterface) server).beginTransaction(transactionState
.getTransactionId(), location.getRegionInfo().getRegionName());
}
return ((TransactionalRegionInterface) server).openScanner(transactionState
.getTransactionId(), this.location.getRegionInfo().getRegionName(),
getColumns(), row, getTimestamp(), getFilter());
}
}

View File

@ -0,0 +1,75 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.transactional;
import java.util.HashSet;
import java.util.Set;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HRegionLocation;
/**
* Holds client-side transaction information. Client's use them as opaque
* objects passed around to transaction operations.
*
*/
public class TransactionState {
static final Log LOG = LogFactory.getLog(TransactionState.class);
private final long transactionId;
private Set<HRegionLocation> participatingRegions = new HashSet<HRegionLocation>();
TransactionState(final long transactionId) {
this.transactionId = transactionId;
}
boolean addRegion(final HRegionLocation hregion) {
boolean added = participatingRegions.add(hregion);
if (added) {
LOG.debug("Adding new hregion ["
+ hregion.getRegionInfo().getRegionNameAsString()
+ "] to transaction [" + transactionId + "]");
}
return added;
}
Set<HRegionLocation> getParticipatingRegions() {
return participatingRegions;
}
/**
* Get the transactionId.
*
* @return Return the transactionId.
*/
public long getTransactionId() {
return transactionId;
}
@Override
public String toString() {
return "id: " + transactionId + ", particpants: "
+ participatingRegions.size();
}
}

View File

@ -0,0 +1,401 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.transactional;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.HConnection;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scanner;
import org.apache.hadoop.hbase.client.ScannerCallable;
import org.apache.hadoop.hbase.client.ServerCallable;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
import org.apache.hadoop.io.Text;
/**
* Table with transactional support.
*
*/
public class TransactionalTable extends HTable {
public TransactionalTable(final HBaseConfiguration conf,
final String tableName) throws IOException {
super(conf, tableName);
}
public TransactionalTable(final HBaseConfiguration conf, final Text tableName)
throws IOException {
super(conf, tableName);
}
public TransactionalTable(final HBaseConfiguration conf,
final byte[] tableName) throws IOException {
super(conf, tableName);
}
private static abstract class TransactionalServerCallable<T> extends
ServerCallable<T> {
protected TransactionState transactionState;
protected TransactionalRegionInterface getTransactionServer() {
return (TransactionalRegionInterface) server;
}
protected void recordServer() throws IOException {
if (transactionState.addRegion(location)) {
getTransactionServer().beginTransaction(
transactionState.getTransactionId(),
location.getRegionInfo().getRegionName());
}
}
public TransactionalServerCallable(final HConnection connection,
final byte[] tableName, final byte[] row,
final TransactionState transactionState) {
super(connection, tableName, row);
this.transactionState = transactionState;
}
}
/**
* Get a single value for the specified row and column
*
* @param row row key
* @param column column name
* @return value for specified row/column
* @throws IOException
*/
public Cell get(final TransactionState transactionState, final byte[] row,
final byte[] column) throws IOException {
return super.getConnection().getRegionServerWithRetries(
new TransactionalServerCallable<Cell>(super.getConnection(), super
.getTableName(), row, transactionState) {
public Cell call() throws IOException {
recordServer();
return getTransactionServer().get(
transactionState.getTransactionId(),
location.getRegionInfo().getRegionName(), row, column);
}
});
}
/**
* Get the specified number of versions of the specified row and column
*
* @param row - row key
* @param column - column name
* @param numVersions - number of versions to retrieve
* @return - array byte values
* @throws IOException
*/
public Cell[] get(final TransactionState transactionState, final byte[] row,
final byte[] column, final int numVersions) throws IOException {
Cell[] values = null;
values = super.getConnection().getRegionServerWithRetries(
new TransactionalServerCallable<Cell[]>(super.getConnection(), super
.getTableName(), row, transactionState) {
public Cell[] call() throws IOException {
recordServer();
return getTransactionServer().get(
transactionState.getTransactionId(),
location.getRegionInfo().getRegionName(), row, column,
numVersions);
}
});
return values;
}
/**
* Get the specified number of versions of the specified row and column with
* the specified timestamp.
*
* @param row - row key
* @param column - column name
* @param timestamp - timestamp
* @param numVersions - number of versions to retrieve
* @return - array of values that match the above criteria
* @throws IOException
*/
public Cell[] get(final TransactionState transactionState, final byte[] row,
final byte[] column, final long timestamp, final int numVersions)
throws IOException {
Cell[] values = null;
values = super.getConnection().getRegionServerWithRetries(
new TransactionalServerCallable<Cell[]>(super.getConnection(), super
.getTableName(), row, transactionState) {
public Cell[] call() throws IOException {
recordServer();
return getTransactionServer().get(
transactionState.getTransactionId(),
location.getRegionInfo().getRegionName(), row, column,
timestamp, numVersions);
}
});
return values;
}
/**
* Get all the data for the specified row at the latest timestamp
*
* @param row row key
* @return RowResult is empty if row does not exist.
* @throws IOException
*/
public RowResult getRow(final TransactionState transactionState,
final byte[] row) throws IOException {
return getRow(transactionState, row, HConstants.LATEST_TIMESTAMP);
}
/**
* Get all the data for the specified row at a specified timestamp
*
* @param row row key
* @param ts timestamp
* @return RowResult is empty if row does not exist.
* @throws IOException
*/
public RowResult getRow(final TransactionState transactionState,
final byte[] row, final long ts) throws IOException {
return super.getConnection().getRegionServerWithRetries(
new TransactionalServerCallable<RowResult>(super.getConnection(), super
.getTableName(), row, transactionState) {
public RowResult call() throws IOException {
recordServer();
return getTransactionServer().getRow(
transactionState.getTransactionId(),
location.getRegionInfo().getRegionName(), row, ts);
}
});
}
/**
* Get selected columns for the specified row at the latest timestamp
*
* @param row row key
* @param columns Array of column names you want to retrieve.
* @return RowResult is empty if row does not exist.
* @throws IOException
*/
public RowResult getRow(final TransactionState transactionState,
final byte[] row, final byte[][] columns) throws IOException {
return getRow(transactionState, row, columns, HConstants.LATEST_TIMESTAMP);
}
/**
* Get selected columns for the specified row at a specified timestamp
*
* @param row row key
* @param columns Array of column names you want to retrieve.
* @param ts timestamp
* @return RowResult is empty if row does not exist.
* @throws IOException
*/
public RowResult getRow(final TransactionState transactionState,
final byte[] row, final byte[][] columns, final long ts)
throws IOException {
return super.getConnection().getRegionServerWithRetries(
new TransactionalServerCallable<RowResult>(super.getConnection(), super
.getTableName(), row, transactionState) {
public RowResult call() throws IOException {
recordServer();
return getTransactionServer().getRow(
transactionState.getTransactionId(),
location.getRegionInfo().getRegionName(), row, columns, ts);
}
});
}
/**
* Delete all cells that match the passed row and whose timestamp is equal-to
* or older than the passed timestamp.
*
* @param row Row to update
* @param column name of column whose value is to be deleted
* @param ts Delete all cells of the same timestamp or older.
* @throws IOException
*/
public void deleteAll(final TransactionState transactionState,
final byte[] row, final long ts) throws IOException {
super.getConnection().getRegionServerWithRetries(
new TransactionalServerCallable<Boolean>(super.getConnection(), super
.getTableName(), row, transactionState) {
public Boolean call() throws IOException {
recordServer();
getTransactionServer().deleteAll(
transactionState.getTransactionId(),
location.getRegionInfo().getRegionName(), row, ts);
return null;
}
});
}
/**
* Get a scanner on the current table starting at first row. Return the
* specified columns.
*
* @param columns columns to scan. If column name is a column family, all
* columns of the specified column family are returned. Its also possible to
* pass a regex in the column qualifier. A column qualifier is judged to be a
* regex if it contains at least one of the following characters:
* <code>\+|^&*$[]]}{)(</code>.
* @return scanner
* @throws IOException
*/
public Scanner getScanner(final TransactionState transactionState,
final byte[][] columns) throws IOException {
return getScanner(transactionState, columns, HConstants.EMPTY_START_ROW,
HConstants.LATEST_TIMESTAMP, null);
}
/**
* Get a scanner on the current table starting at the specified row. Return
* the specified columns.
*
* @param columns columns to scan. If column name is a column family, all
* columns of the specified column family are returned. Its also possible to
* pass a regex in the column qualifier. A column qualifier is judged to be a
* regex if it contains at least one of the following characters:
* <code>\+|^&*$[]]}{)(</code>.
* @param startRow starting row in table to scan
* @return scanner
* @throws IOException
*/
public Scanner getScanner(final TransactionState transactionState,
final byte[][] columns, final byte[] startRow) throws IOException {
return getScanner(transactionState, columns, startRow,
HConstants.LATEST_TIMESTAMP, null);
}
/**
* Get a scanner on the current table starting at the specified row. Return
* the specified columns.
*
* @param columns columns to scan. If column name is a column family, all
* columns of the specified column family are returned. Its also possible to
* pass a regex in the column qualifier. A column qualifier is judged to be a
* regex if it contains at least one of the following characters:
* <code>\+|^&*$[]]}{)(</code>.
* @param startRow starting row in table to scan
* @param timestamp only return results whose timestamp <= this value
* @return scanner
* @throws IOException
*/
public Scanner getScanner(final TransactionState transactionState,
final byte[][] columns, final byte[] startRow, final long timestamp)
throws IOException {
return getScanner(transactionState, columns, startRow, timestamp, null);
}
/**
* Get a scanner on the current table starting at the specified row. Return
* the specified columns.
*
* @param columns columns to scan. If column name is a column family, all
* columns of the specified column family are returned. Its also possible to
* pass a regex in the column qualifier. A column qualifier is judged to be a
* regex if it contains at least one of the following characters:
* <code>\+|^&*$[]]}{)(</code>.
* @param startRow starting row in table to scan
* @param filter a row filter using row-key regexp and/or column data filter.
* @return scanner
* @throws IOException
*/
public Scanner getScanner(final TransactionState transactionState,
final byte[][] columns, final byte[] startRow,
final RowFilterInterface filter) throws IOException {
return getScanner(transactionState, columns, startRow,
HConstants.LATEST_TIMESTAMP, filter);
}
/**
* Get a scanner on the current table starting at the specified row. Return
* the specified columns.
*
* @param columns columns to scan. If column name is a column family, all
* columns of the specified column family are returned. Its also possible to
* pass a regex in the column qualifier. A column qualifier is judged to be a
* regex if it contains at least one of the following characters:
* <code>\+|^&*$[]]}{)(</code>.
* @param startRow starting row in table to scan
* @param timestamp only return results whose timestamp <= this value
* @param filter a row filter using row-key regexp and/or column data filter.
* @return scanner
* @throws IOException
*/
public Scanner getScanner(final TransactionState transactionState,
final byte[][] columns, final byte[] startRow, final long timestamp,
final RowFilterInterface filter) throws IOException {
ClientScanner scanner = new TransactionalClientScanner(transactionState, columns, startRow,
timestamp, filter);
scanner.initialize();
return scanner;
}
/**
* Commit a BatchUpdate to the table.
*
* @param batchUpdate
* @throws IOException
*/
public synchronized void commit(final TransactionState transactionState,
final BatchUpdate batchUpdate) throws IOException {
super.getConnection().getRegionServerWithRetries(
new TransactionalServerCallable<Boolean>(super.getConnection(), super
.getTableName(), batchUpdate.getRow(), transactionState) {
public Boolean call() throws IOException {
recordServer();
getTransactionServer().batchUpdate(
transactionState.getTransactionId(),
location.getRegionInfo().getRegionName(), batchUpdate);
return null;
}
});
}
protected class TransactionalClientScanner extends HTable.ClientScanner {
private TransactionState transactionState;
protected TransactionalClientScanner(
final TransactionState transactionState, final byte[][] columns,
final byte[] startRow, final long timestamp,
final RowFilterInterface filter) {
super(columns, startRow, timestamp, filter);
this.transactionState = transactionState;
}
@Override
protected ScannerCallable getScannerCallable(final byte[] localStartKey) {
return new TransactionScannerCallable(transactionState, getConnection(),
getTableName(), getColumns(), localStartKey, getTimestamp(),
getFilter());
}
}
}

View File

@ -0,0 +1,41 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.transactional;
import org.apache.hadoop.hbase.DoNotRetryIOException;
/**
* Thrown if a region server is passed an unknown transaction id
*/
public class UnknownTransactionException extends DoNotRetryIOException {
/** constructor */
public UnknownTransactionException() {
super();
}
/**
* Constructor
* @param s message
*/
public UnknownTransactionException(String s) {
super(s);
}
}

View File

@ -0,0 +1,59 @@
<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">
<html>
<!--
Licensed to the Apache Software Foundation (ASF) under one or more
contributor license agreements. See the NOTICE file distributed with
this work for additional information regarding copyright ownership.
The ASF licenses this file to You under the Apache License, Version 2.0
(the "License"); you may not use this file except in compliance with
the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<head />
<body bgcolor="white">
This package provides support for atomic transactions. Transactions can
span multiple regions. Transaction writes are applied when committing a
transaction. At commit time, the transaction is examined to see if it
can be applied while still maintaining atomicity. This is done by
looking for conflicts with the transactions that committed while the
current transaction was running. This technique is known as optimistic
concurrency control (OCC) because it relies on the assumption that
transactions will mostly not have conflicts with each other.
<p>
For more details on OCC, see the paper <i> On Optimistic Methods for Concurrency Control </i>
by Kung and Robinson available
<a href=http://www.seas.upenn.edu/~zives/cis650/papers/opt-cc.pdf> here </a>.
<p> To enable transactions, modify hbase-site.xml to turn on the
TransactionalRegionServer. This is done by setting
<i>hbase.regionserver.class</i> to
<i>org.apache.hadoop.hbase.ipc.TransactionalRegionInterface</i> and
<i>hbase.regionserver.impl </i> to
<i>org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer</i>
<h3> Known Issues </h3>
Recovery in the face of hregion server failure
is not fully implemented. Thus, you cannot rely on the transactional
properties in the face of node failure.
<p> In order to avoid phantom reads on scanners, scanners currently
claim a <i>write set</i> for all rows in every regions which they scan
through. This means that if transaction A writes to a region that
transaction B is scanning, then there is a conflict (only one
transacton can be committed). This will occur even if the scanner
never went over the row that was written.
</body>
</html>

View File

@ -0,0 +1,178 @@
/*
* $Id$
* Created on Jun 4, 2008
*
*/
package org.apache.hadoop.hbase.ipc;
import java.io.IOException;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.RowResult;
/**
* Interface for transactional region servers.
*
*/
public interface TransactionalRegionInterface extends HRegionInterface {
/** Interface version number */
public static final long versionID = 1L;
/**
* Sent to initiate a transaction.
*
* @param transactionId
* @param regionName name of region
*/
public void beginTransaction(long transactionId, final byte[] regionName)
throws IOException;
/**
* Retrieve a single value from the specified region for the specified row and
* column keys
*
* @param regionName name of region
* @param row row key
* @param column column key
* @return alue for that region/row/column
* @throws IOException
*/
public Cell get(long transactionId, final byte[] regionName,
final byte[] row, final byte[] column) throws IOException;
/**
* Get the specified number of versions of the specified row and column
*
* @param regionName region name
* @param row row key
* @param column column key
* @param numVersions number of versions to return
* @return array of values
* @throws IOException
*/
public Cell[] get(long transactionId, final byte[] regionName,
final byte[] row, final byte[] column, final int numVersions)
throws IOException;
/**
* Get the specified number of versions of the specified row and column with
* the specified timestamp.
*
* @param regionName region name
* @param row row key
* @param column column key
* @param timestamp timestamp
* @param numVersions number of versions to return
* @return array of values
* @throws IOException
*/
public Cell[] get(long transactionId, final byte[] regionName,
final byte[] row, final byte[] column, final long timestamp,
final int numVersions) throws IOException;
/**
* Get all the data for the specified row at a given timestamp
*
* @param regionName region name
* @param row row key
* @return map of values
* @throws IOException
*/
public RowResult getRow(long transactionId, final byte[] regionName,
final byte[] row, final long ts) throws IOException;
/**
* Get selected columns for the specified row at a given timestamp.
*
* @param regionName region name
* @param row row key
* @return map of values
* @throws IOException
*/
public RowResult getRow(long transactionId, final byte[] regionName,
final byte[] row, final byte[][] columns, final long ts)
throws IOException;
/**
* Get selected columns for the specified row at the latest timestamp.
*
* @param regionName region name
* @param row row key
* @return map of values
* @throws IOException
*/
public RowResult getRow(long transactionId, final byte[] regionName,
final byte[] row, final byte[][] columns) throws IOException;
/**
* Delete all cells that match the passed row and whose timestamp is equal-to
* or older than the passed timestamp.
*
* @param regionName region name
* @param row row key
* @param timestamp Delete all entries that have this timestamp or older
* @throws IOException
*/
public void deleteAll(long transactionId, byte[] regionName, byte[] row,
long timestamp) throws IOException;
/**
* Opens a remote scanner with a RowFilter.
*
* @param transactionId
* @param regionName name of region to scan
* @param columns columns to scan. If column name is a column family, all
* columns of the specified column family are returned. Its also possible to
* pass a regex for column family name. A column name is judged to be regex if
* it contains at least one of the following characters:
* <code>\+|^&*$[]]}{)(</code>.
* @param startRow starting row to scan
* @param timestamp only return values whose timestamp is <= this value
* @param filter RowFilter for filtering results at the row-level.
*
* @return scannerId scanner identifier used in other calls
* @throws IOException
*/
public long openScanner(final long transactionId, final byte[] regionName,
final byte[][] columns, final byte[] startRow, long timestamp,
RowFilterInterface filter) throws IOException;
/**
* Applies a batch of updates via one RPC
*
* @param regionName name of the region to update
* @param b BatchUpdate
* @throws IOException
*/
public void batchUpdate(long transactionId, final byte[] regionName,
final BatchUpdate b) throws IOException;
/**
* Ask if we can commit the given transaction.
*
* @param transactionId
* @return true if we can commit
*/
public boolean commitRequest(final byte[] regionName, long transactionId)
throws IOException;
/**
* Commit the transaction.
*
* @param transactionId
* @return
*/
public void commit(final byte[] regionName, long transactionId)
throws IOException;
/**
* Abort the transaction.
*
* @param transactionId
* @return
*/
public void abort(final byte[] regionName, long transactionId)
throws IOException;
}

View File

@ -171,7 +171,7 @@ public class HLog implements HConstants {
* Accessor for tests.
* @return Current state of the monotonically increasing file id.
*/
long getFilenum() {
public long getFilenum() {
return this.filenum;
}
@ -204,6 +204,10 @@ public class HLog implements HConstants {
}
}
}
public long getSequenceNumber() {
return logSeqNum;
}
/**
* Roll the log writer. That is, start writing log messages to a new file.
@ -311,7 +315,7 @@ public class HLog implements HConstants {
* This is a convenience method that computes a new filename with a given
* file-number.
*/
Path computeFilename(final long fn) {
public Path computeFilename(final long fn) {
return new Path(dir, HLOG_DATFILE + fn);
}
@ -330,7 +334,7 @@ public class HLog implements HConstants {
*
* @throws IOException
*/
void close() throws IOException {
public void close() throws IOException {
cacheFlushLock.lock();
try {
synchronized (updateLock) {
@ -391,13 +395,8 @@ public class HLog implements HConstants {
new HLogKey(regionName, tableName, key.getRow(), seqNum[counter++]);
HLogEdit logEdit =
new HLogEdit(key.getColumn(), es.getValue(), key.getTimestamp());
try {
this.writer.append(logKey, logEdit);
} catch (IOException e) {
LOG.fatal("Could not append. Requesting close of log", e);
requestLogRoll();
throw e;
}
doWrite(logKey, logEdit);
this.numEntries++;
}
}
@ -411,6 +410,63 @@ public class HLog implements HConstants {
this.listener.logRollRequested();
}
}
private void doWrite(HLogKey logKey, HLogEdit logEdit) throws IOException {
try {
this.writer.append(logKey, logEdit);
} catch (IOException e) {
LOG.fatal("Could not append. Requesting close of log", e);
requestLogRoll();
throw e;
}
}
/** Append an entry without a row to the log.
*
* @param regionInfo
* @param logEdit
* @throws IOException
*/
public void append(HRegionInfo regionInfo, HLogEdit logEdit) throws IOException {
this.append(regionInfo, new byte[0], logEdit);
}
/** Append an entry to the log.
*
* @param regionName
* @param tableName
* @param row
* @param logEdit
* @throws IOException
*/
public void append(HRegionInfo regionInfo, byte [] row, HLogEdit logEdit) throws IOException {
if (closed) {
throw new IOException("Cannot append; log is closed");
}
byte [] regionName = regionInfo.getRegionName();
byte [] tableName = regionInfo.getTableDesc().getName();
synchronized (updateLock) {
long seqNum = obtainSeqNum();
// The 'lastSeqWritten' map holds the sequence number of the oldest
// write for each region. When the cache is flushed, the entry for the
// region being flushed is removed if the sequence number of the flush
// is greater than or equal to the value in lastSeqWritten.
if (!this.lastSeqWritten.containsKey(regionName)) {
this.lastSeqWritten.put(regionName, Long.valueOf(seqNum));
}
HLogKey logKey = new HLogKey(regionName, tableName, row, seqNum);
doWrite(logKey, logEdit);
this.numEntries++;
}
if (this.numEntries > this.maxlogentries) {
if (listener != null) {
listener.logRollRequested();
}
}
}
/** @return How many items have been added to the log */
int getNumEntries() {
@ -508,6 +564,10 @@ public class HLog implements HConstants {
this.cacheFlushLock.unlock();
}
public static boolean isMetaColumn(byte [] column) {
return Bytes.equals(METACOLUMN, column);
}
/**
* Split up a bunch of log files, that are no longer being written to, into
* new files, one per region. Delete the old log files when finished.

View File

@ -19,6 +19,7 @@
*/
package org.apache.hadoop.hbase.regionserver;
import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.*;
@ -62,11 +63,20 @@ public class HLogEdit implements Writable, HConstants {
public static boolean isDeleted(final byte [] value) {
return (value == null)? false: deleteBytes.compareTo(value) == 0;
}
public enum TransactionalOperation {
START, WRITE, COMMIT, ABORT
}
private byte [] column;
private byte [] val;
private long timestamp;
private static final int MAX_VALUE_LEN = 128;
private boolean isTransactionEntry;
private Long transactionId = null;
private TransactionalOperation operation;
/**
* Default constructor used by Writable
@ -85,6 +95,34 @@ public class HLogEdit implements Writable, HConstants {
this.column = c;
this.val = bval;
this.timestamp = timestamp;
this.isTransactionEntry = false;
}
/** Construct a WRITE transaction.
*
* @param transactionId
* @param op
* @param timestamp
*/
public HLogEdit(long transactionId, BatchOperation op, long timestamp) {
this(op.getColumn(), op.getValue(), timestamp);
// This covers delete ops too...
this.transactionId = transactionId;
this.operation = TransactionalOperation.WRITE;
this.isTransactionEntry = true;
}
/** Construct a transactional operation (BEGIN, ABORT, or COMMIT).
*
* @param transactionId
* @param op
*/
public HLogEdit(long transactionId, TransactionalOperation op) {
this.column = new byte[0];
this.val = new byte[0];
this.transactionId = transactionId;
this.operation = op;
this.isTransactionEntry = true;
}
/** @return the column */
@ -101,6 +139,28 @@ public class HLogEdit implements Writable, HConstants {
public long getTimestamp() {
return this.timestamp;
}
public boolean isTransactionEntry() {
return isTransactionEntry;
}
/**
* Get the transactionId, or null if this is not a transactional edit.
*
* @return Return the transactionId.
*/
public Long getTransactionId() {
return transactionId;
}
/**
* Get the operation.
*
* @return Return the operation.
*/
public TransactionalOperation getOperation() {
return operation;
}
/**
* @return First column name, timestamp, and first 128 bytes of the value
@ -117,8 +177,13 @@ public class HLogEdit implements Writable, HConstants {
} catch (UnsupportedEncodingException e) {
throw new RuntimeException("UTF8 encoding not present?", e);
}
return "(" + Bytes.toString(getColumn()) + "/" + getTimestamp() + "/" +
value + ")";
return "("
+ Bytes.toString(getColumn())
+ "/"
+ getTimestamp()
+ "/"
+ (isTransactionEntry ? "tran: " + transactionId + " op "
+ operation.toString() +"/": "") + value + ")";
}
// Writable
@ -126,9 +191,18 @@ public class HLogEdit implements Writable, HConstants {
/** {@inheritDoc} */
public void write(DataOutput out) throws IOException {
Bytes.writeByteArray(out, this.column);
out.writeInt(this.val.length);
out.write(this.val);
if (this.val == null) {
out.writeInt(0);
} else {
out.writeInt(this.val.length);
out.write(this.val);
}
out.writeLong(timestamp);
out.writeBoolean(isTransactionEntry);
if (isTransactionEntry) {
out.writeLong(transactionId);
out.writeUTF(operation.name());
}
}
/** {@inheritDoc} */
@ -137,5 +211,10 @@ public class HLogEdit implements Writable, HConstants {
this.val = new byte[in.readInt()];
in.readFully(this.val);
this.timestamp = in.readLong();
isTransactionEntry = in.readBoolean();
if (isTransactionEntry) {
transactionId = in.readLong();
operation = TransactionalOperation.valueOf(in.readUTF());
}
}
}

View File

@ -30,6 +30,8 @@ import java.io.*;
* The log intermingles edits to many tables and rows, so each log entry
* identifies the appropriate table and row. Within a table and row, they're
* also sorted.
*
* Some Transactional edits (START, COMMIT, ABORT) will not have an associated row.
*/
public class HLogKey implements WritableComparable {
private byte [] regionName;
@ -64,19 +66,19 @@ public class HLogKey implements WritableComparable {
// A bunch of accessors
//////////////////////////////////////////////////////////////////////////////
byte [] getRegionName() {
public byte [] getRegionName() {
return regionName;
}
byte [] getTablename() {
public byte [] getTablename() {
return tablename;
}
byte [] getRow() {
public byte [] getRow() {
return row;
}
long getLogSeqNum() {
public long getLogSeqNum() {
return logSeqNum;
}

View File

@ -243,8 +243,8 @@ public class HRegion implements HConstants {
LOG.debug("Files for new region");
listPaths(fs, newRegionDir);
}
HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo,
null, null);
HRegion dstRegion = new HRegion(basedir, log, fs, conf, newRegionInfo, null);
dstRegion.initialize(null, null);
dstRegion.compactStores();
if (LOG.isDebugEnabled()) {
LOG.debug("Files for new region");
@ -318,7 +318,7 @@ public class HRegion implements HConstants {
private final Map<Integer, TreeMap<HStoreKey, byte []>> targetColumns =
new ConcurrentHashMap<Integer, TreeMap<HStoreKey, byte []>>();
// Default access because read by tests.
final Map<Integer, HStore> stores = new ConcurrentHashMap<Integer, HStore>();
protected final Map<Integer, HStore> stores = new ConcurrentHashMap<Integer, HStore>();
final AtomicLong memcacheSize = new AtomicLong(0);
final Path basedir;
@ -376,7 +376,7 @@ public class HRegion implements HConstants {
private final ReentrantReadWriteLock updatesLock =
new ReentrantReadWriteLock();
private final Integer splitLock = new Integer(0);
private final long minSequenceId;
private long minSequenceId;
final AtomicInteger activeScannerCount = new AtomicInteger(0);
//////////////////////////////////////////////////////////////////////////////
@ -395,31 +395,6 @@ public class HRegion implements HConstants {
* appropriate log info for this HRegion. If there is a previous log file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
* @param fs is the filesystem.
* @param conf is global configuration settings.
* @param regionInfo - HRegionInfo that describes the region
* @param initialFiles If there are initial files (implying that the HRegion
* is new), then read them from the supplied path.
* @param flushListener an object that implements CacheFlushListener or null
* or null
* @throws IOException
*/
public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
HRegionInfo regionInfo, Path initialFiles,
FlushRequester flushListener) throws IOException {
this(basedir, log, fs, conf, regionInfo, initialFiles, flushListener, null);
}
/**
* HRegion constructor.
*
* @param log The HLog is the outbound log for any updates to the HRegion
* (There's a single HLog for all the HRegions on a single HRegionServer.)
* The log file is a logfile from the previous execution that's
* custom-computed for this HRegion. The HRegionServer computes and sorts the
* appropriate log info for this HRegion. If there is a previous log file
* (implying that the HRegion has been written-to before), then read it from
* the supplied path.
* @param basedir qualified path of directory where region should be located,
* usually the table directory.
* @param fs is the filesystem.
@ -434,10 +409,8 @@ public class HRegion implements HConstants {
* @throws IOException
*/
public HRegion(Path basedir, HLog log, FileSystem fs, HBaseConfiguration conf,
HRegionInfo regionInfo, Path initialFiles,
FlushRequester flushListener, final Progressable reporter)
throws IOException {
HRegionInfo regionInfo,
FlushRequester flushListener) {
this.basedir = basedir;
this.log = log;
this.fs = fs;
@ -447,7 +420,6 @@ public class HRegion implements HConstants {
this.threadWakeFrequency = conf.getLong(THREAD_WAKE_FREQUENCY, 10 * 1000);
String encodedNameStr = Integer.toString(this.regionInfo.getEncodedName());
this.regiondir = new Path(basedir, encodedNameStr);
Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
this.historian = RegionHistorian.getInstance();
if (LOG.isDebugEnabled()) {
@ -457,6 +429,27 @@ public class HRegion implements HConstants {
this.regionCompactionDir =
new Path(getCompactionDir(basedir), encodedNameStr);
int flushSize = regionInfo.getTableDesc().getMemcacheFlushSize();
if (flushSize == HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE) {
flushSize = conf.getInt("hbase.hregion.memcache.flush.size",
HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE);
}
this.memcacheFlushSize = flushSize;
this.blockingMemcacheSize = this.memcacheFlushSize *
conf.getInt("hbase.hregion.memcache.block.multiplier", 1);
}
/** Initialize this region and get it ready to roll.
*
* @param initialFiles
* @param reporter
* @throws IOException
*/
public void initialize( Path initialFiles,
final Progressable reporter) throws IOException {
Path oldLogFile = new Path(regiondir, HREGION_OLDLOGFILE_NAME);
// Move prefab HStore files into place (if any). This picks up split files
// and any merges from splits and merges dirs.
@ -466,16 +459,20 @@ public class HRegion implements HConstants {
// Load in all the HStores.
long maxSeqId = -1;
long minSeqId = Integer.MAX_VALUE;
for (HColumnDescriptor c : this.regionInfo.getTableDesc().getFamilies()) {
HStore store = instantiateHStore(this.basedir, c, oldLogFile, reporter);
stores.put(Bytes.mapKey(c.getName()), store);
long storeSeqId = store.getMaxSequenceId();
if (storeSeqId > maxSeqId) {
maxSeqId = storeSeqId;
}
if (storeSeqId < minSeqId) {
minSeqId = storeSeqId;
}
}
doReconstructionLog(oldLogFile, maxSeqId, reporter);
doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
if (fs.exists(oldLogFile)) {
if (LOG.isDebugEnabled()) {
@ -501,17 +498,6 @@ public class HRegion implements HConstants {
if (fs.exists(merges)) {
fs.delete(merges, true);
}
int flushSize = regionInfo.getTableDesc().getMemcacheFlushSize();
if (flushSize == HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE) {
flushSize = conf.getInt("hbase.hregion.memcache.flush.size",
HTableDescriptor.DEFAULT_MEMCACHE_FLUSH_SIZE);
}
this.memcacheFlushSize = flushSize;
this.blockingMemcacheSize = this.memcacheFlushSize *
conf.getInt("hbase.hregion.memcache.block.multiplier", 1);
// See if region is meant to run read-only.
if (this.regionInfo.getTableDesc().isReadOnly()) {
this.writestate.setReadOnly(true);
@ -797,10 +783,12 @@ public class HRegion implements HConstants {
// Opening the region copies the splits files from the splits directory
// under each region.
HRegion regionA =
new HRegion(basedir, log, fs, conf, regionAInfo, dirA, null);
new HRegion(basedir, log, fs, conf, regionAInfo, null);
regionA.initialize(dirA, null);
regionA.close();
HRegion regionB =
new HRegion(basedir, log, fs, conf, regionBInfo, dirB, null);
new HRegion(basedir, log, fs, conf, regionBInfo, null);
regionB.initialize(dirB, null);
regionB.close();
// Cleanup
@ -1029,12 +1017,14 @@ public class HRegion implements HConstants {
// again so its value will represent the size of the updates received
// during the flush
long sequenceId = -1L;
long completeSequenceId = -1L;
this.updatesLock.writeLock().lock();
try {
for (HStore s: stores.values()) {
s.snapshot();
}
sequenceId = log.startCacheFlush();
completeSequenceId = this.getCompleteCacheFlushSequenceId(sequenceId);
this.memcacheSize.set(0);
} finally {
this.updatesLock.writeLock().unlock();
@ -1050,7 +1040,7 @@ public class HRegion implements HConstants {
// Keep running vector of all store files that includes both old and the
// just-made new flush store file.
for (HStore hstore: stores.values()) {
boolean needsCompaction = hstore.flushCache(sequenceId);
boolean needsCompaction = hstore.flushCache(completeSequenceId);
if (needsCompaction) {
compactionRequested = true;
}
@ -1077,7 +1067,7 @@ public class HRegion implements HConstants {
// and that all updates to the log for this regionName that have lower
// log-sequence-ids can be safely ignored.
this.log.completeCacheFlush(getRegionName(),
regionInfo.getTableDesc().getName(), sequenceId);
regionInfo.getTableDesc().getName(), completeSequenceId);
// C. Finally notify anyone waiting on memcache to clear:
// e.g. checkResources().
@ -1099,6 +1089,18 @@ public class HRegion implements HConstants {
return compactionRequested;
}
/**
* Get the sequence number to be associated with this cache flush. Used by
* TransactionalRegion to not complete pending transactions.
*
*
* @param currentSequenceId
* @return sequence id to complete the cache flush with
*/
protected long getCompleteCacheFlushSequenceId(long currentSequenceId) {
return currentSequenceId;
}
//////////////////////////////////////////////////////////////////////////////
// get() methods for client use.
//////////////////////////////////////////////////////////////////////////////
@ -1346,7 +1348,33 @@ public class HRegion implements HConstants {
* @param b
* @throws IOException
*/
public void batchUpdate(BatchUpdate b, Integer lockid)
public void batchUpdate(BatchUpdate b) throws IOException {
this.batchUpdate(b, null, true);
}
/**
* @param b
* @throws IOException
*/
public void batchUpdate(BatchUpdate b, boolean writeToWAL) throws IOException {
this.batchUpdate(b, null, writeToWAL);
}
/**
* @param b
* @throws IOException
*/
public void batchUpdate(BatchUpdate b, Integer lockid) throws IOException {
this.batchUpdate(b, lockid, true);
}
/**
* @param b
* @param writeToWal if true, then we write this update to the log
* @throws IOException
*/
public void batchUpdate(BatchUpdate b, Integer lockid, boolean writeToWAL)
throws IOException {
checkReadOnly();
@ -1395,7 +1423,7 @@ public class HRegion implements HConstants {
this.targetColumns.remove(lid);
if (edits != null && edits.size() > 0) {
update(edits);
update(edits, writeToWAL);
}
if (deletes != null && deletes.size() > 0) {
@ -1597,16 +1625,25 @@ public class HRegion implements HConstants {
}
targets.put(key, val);
}
/*
/**
* Add updates first to the hlog and then add values to memcache.
* Warning: Assumption is caller has lock on passed in row.
* @param row Row to update.
* @param timestamp Timestamp to record the updates against
* @param updatesByColumn Cell updates by column
* @throws IOException
*/
private void update(final TreeMap<HStoreKey, byte []> updatesByColumn)
private void update(final TreeMap<HStoreKey, byte []> updatesByColumn) throws IOException {
this.update(updatesByColumn, true);
}
/**
* Add updates first to the hlog (if writeToWal) and then add values to memcache.
* Warning: Assumption is caller has lock on passed in row.
* @param writeToWAL if true, then we should write to the log
* @param updatesByColumn Cell updates by column
* @throws IOException
*/
private void update(final TreeMap<HStoreKey, byte []> updatesByColumn, boolean writeToWAL)
throws IOException {
if (updatesByColumn == null || updatesByColumn.size() <= 0) {
return;
@ -1615,8 +1652,10 @@ public class HRegion implements HConstants {
boolean flush = false;
this.updatesLock.readLock().lock();
try {
this.log.append(regionInfo.getRegionName(),
regionInfo.getTableDesc().getName(), updatesByColumn);
if (writeToWAL) {
this.log.append(regionInfo.getRegionName(), regionInfo.getTableDesc()
.getName(), updatesByColumn);
}
long size = 0;
for (Map.Entry<HStoreKey, byte[]> e: updatesByColumn.entrySet()) {
HStoreKey key = e.getKey();
@ -1660,7 +1699,7 @@ public class HRegion implements HConstants {
// Do any reconstruction needed from the log
@SuppressWarnings("unused")
protected void doReconstructionLog(Path oldLogFile, long maxSeqId,
protected void doReconstructionLog(Path oldLogFile, long minSeqId, long maxSeqId,
Progressable reporter)
throws UnsupportedEncodingException, IOException {
// Nothing to do (Replaying is done in HStores)
@ -2105,9 +2144,11 @@ public class HRegion implements HConstants {
if (!info.isMetaRegion()) {
RegionHistorian.getInstance().addRegionCreation(info);
}
return new HRegion(tableDir,
new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null),
fs, conf, info, null, null);
HRegion region = new HRegion(tableDir,
new HLog(fs, new Path(regionDir, HREGION_LOGDIR_NAME), conf, null),
fs, conf, info, null);
region.initialize(null, null);
return region;
}
/**
@ -2134,7 +2175,8 @@ public class HRegion implements HConstants {
}
HRegion r = new HRegion(
HTableDescriptor.getTableDir(rootDir, info.getTableDesc().getName()),
log, FileSystem.get(conf), conf, info, null, null);
log, FileSystem.get(conf), conf, info, null);
r.initialize(null, null);
if (log != null) {
log.setSequenceNumber(r.getMinSequenceId());
}

View File

@ -900,13 +900,15 @@ public class HRegionServer implements HConstants, HRegionInterface, Runnable {
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
throws IOException {
return new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
.getTableDesc().getName()), this.log, this.fs, conf, regionInfo, null,
this.cacheFlusher, new Progressable() {
public void progress() {
addProcessingMessage(regionInfo);
}
});
HRegion r = new HRegion(HTableDescriptor.getTableDir(rootDir, regionInfo
.getTableDesc().getName()), this.log, this.fs, conf, regionInfo,
this.cacheFlusher);
r.initialize(null, new Progressable() {
public void progress() {
addProcessingMessage(regionInfo);
}
});
return r;
}
/*

View File

@ -311,7 +311,7 @@ public class HStore implements HConstants {
// Check this edit is for me. Also, guard against writing
// METACOLUMN info such as HBASE::CACHEFLUSH entries
byte [] column = val.getColumn();
if (Bytes.equals(column, HLog.METACOLUMN)
if (val.isTransactionEntry() || Bytes.equals(column, HLog.METACOLUMN)
|| !Bytes.equals(key.getRegionName(), info.getRegionName())
|| !HStoreKey.matchingFamily(family.getName(), column)) {
continue;
@ -1316,8 +1316,7 @@ public class HStore implements HConstants {
* @return Matching keys.
* @throws IOException
*/
List<HStoreKey> getKeys(final HStoreKey origin, final int versions,
final long now)
public List<HStoreKey> getKeys(final HStoreKey origin, final int versions, final long now)
throws IOException {
// This code below is very close to the body of the get method. Any
// changes in the flow below should also probably be done in get. TODO:

View File

@ -0,0 +1,53 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.transactional;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.hbase.Chore;
import org.apache.hadoop.hbase.regionserver.HRegion;
/**
* Cleans up committed transactions when they are no longer needed to verify
* pending transactions.
*/
class CleanOldTransactionsChore extends Chore {
private static final String SLEEP_CONF = "hbase.transaction.clean.sleep";
private static final int DEFAULT_SLEEP = 60 * 1000;
private final TransactionalRegionServer regionServer;
public CleanOldTransactionsChore(
final TransactionalRegionServer regionServer,
final AtomicBoolean stopRequest) {
super(regionServer.getConfiguration().getInt(SLEEP_CONF, DEFAULT_SLEEP),
stopRequest);
this.regionServer = regionServer;
}
@Override
protected void chore() {
for (HRegion region : regionServer.getOnlineRegions()) {
((TransactionalRegion) region).removeUnNeededCommitedTransactions();
}
}
}

View File

@ -0,0 +1,296 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.transactional;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeMap;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Holds the state of a transaction.
*/
class TransactionState {
private static final Log LOG = LogFactory.getLog(TransactionState.class);
/** Current status. */
public enum Status {
/** Initial status, still performing operations. */
PENDING,
/**
* Checked if we can commit, and said yes. Still need to determine the
* global decision.
*/
COMMIT_PENDING,
/** Committed. */
COMMITED,
/** Aborted. */
ABORTED
}
private final long hLogStartSequenceId;
private final long transactionId;
private Status status;
private SortedSet<byte[]> readSet = new TreeSet<byte[]>(
Bytes.BYTES_COMPARATOR);
private List<BatchUpdate> writeSet = new LinkedList<BatchUpdate>();
private Set<TransactionState> transactionsToCheck = new HashSet<TransactionState>();
private int startSequenceNumber;
private Integer sequenceNumber;
boolean hasScan = false;
public TransactionState(final long transactionId,
final long rLogStartSequenceId) {
this.transactionId = transactionId;
this.hLogStartSequenceId = rLogStartSequenceId;
this.status = Status.PENDING;
}
public void addRead(final byte[] rowKey) {
readSet.add(rowKey);
}
public Set<byte[]> getReadSet() {
return readSet;
}
public void addWrite(final BatchUpdate write) {
writeSet.add(write);
}
public List<BatchUpdate> getWriteSet() {
return writeSet;
}
/**
* GetFull from the writeSet.
*
* @param row
* @param columns
* @param timestamp
* @return
*/
public Map<byte[], Cell> localGetFull(final byte[] row,
final Set<byte[]> columns, final long timestamp) {
Map<byte[], Cell> results = new TreeMap<byte[], Cell>(
Bytes.BYTES_COMPARATOR); // Must use the Bytes Conparator because
for (BatchUpdate b : writeSet) {
if (!Bytes.equals(row, b.getRow())) {
continue;
}
if (b.getTimestamp() > timestamp) {
continue;
}
for (BatchOperation op : b) {
if (!op.isPut()
|| (columns != null && !columns.contains(op.getColumn()))) {
continue;
}
results.put(op.getColumn(), new Cell(op.getValue(), b.getTimestamp()));
}
}
return results.size() == 0 ? null : results;
}
/**
* Get from the writeSet.
*
* @param row
* @param column
* @param timestamp
* @return
*/
public Cell[] localGet(final byte[] row, final byte[] column,
final long timestamp) {
ArrayList<Cell> results = new ArrayList<Cell>();
// Go in reverse order to put newest updates first in list
for (int i = writeSet.size() - 1; i >= 0; i--) {
BatchUpdate b = writeSet.get(i);
if (!Bytes.equals(row, b.getRow())) {
continue;
}
if (b.getTimestamp() > timestamp) {
continue;
}
for (BatchOperation op : b) {
if (!op.isPut() || !Bytes.equals(column, op.getColumn())) {
continue;
}
results.add(new Cell(op.getValue(), b.getTimestamp()));
}
}
return results.size() == 0 ? null : results
.toArray(new Cell[results.size()]);
}
public void addTransactionToCheck(final TransactionState transaction) {
transactionsToCheck.add(transaction);
}
public boolean hasConflict() {
for (TransactionState transactionState : transactionsToCheck) {
if (hasConflict(transactionState)) {
return true;
}
}
return false;
}
private boolean hasConflict(final TransactionState checkAgainst) {
if (checkAgainst.getStatus().equals(TransactionState.Status.ABORTED)) {
return false; // Cannot conflict with aborted transactions
}
for (BatchUpdate otherUpdate : checkAgainst.getWriteSet()) {
if (this.hasScan) {
LOG.info("Transaction" + this.toString()
+ " has a scan read. Meanwile a write occured. "
+ "Conservitivly reporting conflict");
return true;
}
if (this.getReadSet().contains(otherUpdate.getRow())) {
LOG.trace("Transaction " + this.toString() + " conflicts with "
+ checkAgainst.toString());
return true;
}
}
return false;
}
/**
* Get the status.
*
* @return Return the status.
*/
public Status getStatus() {
return status;
}
/**
* Set the status.
*
* @param status The status to set.
*/
public void setStatus(final Status status) {
this.status = status;
}
/**
* Get the startSequenceNumber.
*
* @return Return the startSequenceNumber.
*/
public int getStartSequenceNumber() {
return startSequenceNumber;
}
/**
* Set the startSequenceNumber.
*
* @param startSequenceNumber.
*/
public void setStartSequenceNumber(final int startSequenceNumber) {
this.startSequenceNumber = startSequenceNumber;
}
/**
* Get the sequenceNumber.
*
* @return Return the sequenceNumber.
*/
public Integer getSequenceNumber() {
return sequenceNumber;
}
/**
* Set the sequenceNumber.
*
* @param sequenceNumber The sequenceNumber to set.
*/
public void setSequenceNumber(final Integer sequenceNumber) {
this.sequenceNumber = sequenceNumber;
}
@Override
public String toString() {
StringBuilder result = new StringBuilder();
result.append("[transactionId: ");
result.append(transactionId);
result.append(" status: ");
result.append(status.name());
result.append(" read Size: ");
result.append(readSet.size());
result.append(" write Size: ");
result.append(writeSet.size());
result.append(" startSQ: ");
result.append(startSequenceNumber);
if (sequenceNumber != null) {
result.append(" commitedSQ:");
result.append(sequenceNumber);
}
result.append("]");
return result.toString();
}
/**
* Get the transactionId.
*
* @return Return the transactionId.
*/
public long getTransactionId() {
return transactionId;
}
/**
* Get the startSequenceId.
*
* @return Return the startSequenceId.
*/
public long getHLogStartSequenceId() {
return hLogStartSequenceId;
}
/**
* Set the hasScan.
*
* @param hasScan The hasScan to set.
*/
public void setHasScan(final boolean hasScan) {
this.hasScan = hasScan;
}
}

View File

@ -0,0 +1,260 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.transactional;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.io.BatchOperation;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.regionserver.HLog;
import org.apache.hadoop.hbase.regionserver.HLogEdit;
import org.apache.hadoop.hbase.regionserver.HLogKey;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.util.Progressable;
/**
* Responsible for writing and reading (recovering) transactional information
* to/from the HLog.
*
*
*/
class TransactionalHLogManager {
private static final Log LOG = LogFactory
.getLog(TransactionalHLogManager.class);
private final HLog hlog;
private final FileSystem fileSystem;
private final HRegionInfo regionInfo;
private final HBaseConfiguration conf;
public TransactionalHLogManager(final TransactionalRegion region) {
this.hlog = region.getLog();
this.fileSystem = region.getFilesystem();
this.regionInfo = region.getRegionInfo();
this.conf = region.getConf();
}
// For Testing
TransactionalHLogManager(final HLog hlog, final FileSystem fileSystem,
final HRegionInfo regionInfo, final HBaseConfiguration conf) {
this.hlog = hlog;
this.fileSystem = fileSystem;
this.regionInfo = regionInfo;
this.conf = conf;
}
public void writeStartToLog(final long transactionId) throws IOException {
HLogEdit logEdit;
logEdit = new HLogEdit(transactionId, HLogEdit.TransactionalOperation.START);
hlog.append(regionInfo, logEdit);
}
public void writeUpdateToLog(final long transactionId,
final BatchUpdate update) throws IOException {
long commitTime = update.getTimestamp() == HConstants.LATEST_TIMESTAMP ? System
.currentTimeMillis()
: update.getTimestamp();
for (BatchOperation op : update) {
HLogEdit logEdit = new HLogEdit(transactionId, op, commitTime);
hlog.append(regionInfo, update.getRow(), logEdit);
}
}
public void writeCommitToLog(final long transactionId) throws IOException {
HLogEdit logEdit;
logEdit = new HLogEdit(transactionId,
HLogEdit.TransactionalOperation.COMMIT);
hlog.append(regionInfo, logEdit);
}
public void writeAbortToLog(final long transactionId) throws IOException {
HLogEdit logEdit;
logEdit = new HLogEdit(transactionId, HLogEdit.TransactionalOperation.ABORT);
hlog.append(regionInfo, logEdit);
}
public Map<Long, List<BatchUpdate>> getCommitsFromLog(
final Path reconstructionLog, final long maxSeqID,
final Progressable reporter) throws UnsupportedEncodingException,
IOException {
if (reconstructionLog == null || !fileSystem.exists(reconstructionLog)) {
// Nothing to do.
return null;
}
// Check its not empty.
FileStatus[] stats = fileSystem.listStatus(reconstructionLog);
if (stats == null || stats.length == 0) {
LOG.warn("Passed reconstruction log " + reconstructionLog
+ " is zero-length");
return null;
}
SortedMap<Long, List<BatchUpdate>> pendingTransactionsById = new TreeMap<Long, List<BatchUpdate>>();
SortedMap<Long, List<BatchUpdate>> commitedTransactionsById = new TreeMap<Long, List<BatchUpdate>>();
Set<Long> abortedTransactions = new HashSet<Long>();
SequenceFile.Reader logReader = new SequenceFile.Reader(fileSystem,
reconstructionLog, conf);
try {
HLogKey key = new HLogKey();
HLogEdit val = new HLogEdit();
long skippedEdits = 0;
long totalEdits = 0;
long startCount = 0;
long writeCount = 0;
long abortCount = 0;
long commitCount = 0;
// How many edits to apply before we send a progress report.
int reportInterval = conf.getInt("hbase.hstore.report.interval.edits",
2000);
while (logReader.next(key, val)) {
LOG.debug("Processing edit: key: " + key.toString() + " val: "
+ val.toString());
if (key.getLogSeqNum() < maxSeqID) {
skippedEdits++;
continue;
}
// Check this edit is for me.
byte[] column = val.getColumn();
Long transactionId = val.getTransactionId();
if (!val.isTransactionEntry() || HLog.isMetaColumn(column)
|| !Bytes.equals(key.getRegionName(), regionInfo.getRegionName())) {
continue;
}
List<BatchUpdate> updates = pendingTransactionsById.get(transactionId);
switch (val.getOperation()) {
case START:
if (updates != null || abortedTransactions.contains(transactionId)
|| commitedTransactionsById.containsKey(transactionId)) {
LOG.error("Processing start for transaction: " + transactionId
+ ", but have already seen start message");
throw new IOException("Corrupted transaction log");
}
updates = new LinkedList<BatchUpdate>();
pendingTransactionsById.put(transactionId, updates);
startCount++;
break;
case WRITE:
if (updates == null) {
LOG.error("Processing edit for transaction: " + transactionId
+ ", but have not seen start message");
throw new IOException("Corrupted transaction log");
}
BatchUpdate tranUpdate = new BatchUpdate(key.getRow());
if (val.getVal() != null) {
tranUpdate.put(val.getColumn(), val.getVal());
} else {
tranUpdate.delete(val.getColumn());
}
updates.add(tranUpdate);
writeCount++;
break;
case ABORT:
if (updates == null) {
LOG.error("Processing abort for transaction: " + transactionId
+ ", but have not seen start message");
throw new IOException("Corrupted transaction log");
}
abortedTransactions.add(transactionId);
pendingTransactionsById.remove(transactionId);
abortCount++;
break;
case COMMIT:
if (updates == null) {
LOG.error("Processing commit for transaction: " + transactionId
+ ", but have not seen start message");
throw new IOException("Corrupted transaction log");
}
if (abortedTransactions.contains(transactionId)) {
LOG.error("Processing commit for transaction: " + transactionId
+ ", but also have abort message");
throw new IOException("Corrupted transaction log");
}
if (updates.size() == 0) {
LOG
.warn("Transaciton " + transactionId
+ " has no writes in log. ");
}
if (commitedTransactionsById.containsKey(transactionId)) {
LOG.error("Processing commit for transaction: " + transactionId
+ ", but have already commited transaction with that id");
throw new IOException("Corrupted transaction log");
}
pendingTransactionsById.remove(transactionId);
commitedTransactionsById.put(transactionId, updates);
commitCount++;
}
totalEdits++;
if (reporter != null && (totalEdits % reportInterval) == 0) {
reporter.progress();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("Read " + totalEdits + " tranasctional operations (skipped "
+ skippedEdits + " because sequence id <= " + maxSeqID + "): "
+ startCount + " starts, " + writeCount + " writes, " + abortCount
+ " aborts, and " + commitCount + " commits.");
}
} finally {
logReader.close();
}
if (pendingTransactionsById.size() > 0) {
LOG
.info("Region log has "
+ pendingTransactionsById.size()
+ " unfinished transactions. Going to the transaction log to resolve");
throw new RuntimeException("Transaction log not yet implemented");
}
return commitedTransactionsById;
}
}

View File

@ -0,0 +1,673 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.transactional;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.Map.Entry;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HStoreKey;
import org.apache.hadoop.hbase.LeaseException;
import org.apache.hadoop.hbase.LeaseListener;
import org.apache.hadoop.hbase.Leases;
import org.apache.hadoop.hbase.Leases.LeaseStillHeldException;
import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
import org.apache.hadoop.hbase.regionserver.HLog;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HStore;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionState.Status;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.util.Progressable;
/**
* Regionserver which provides transactional support for atomic transactions.
* This is achieved with optimistic concurrency control (see
* http://www.seas.upenn.edu/~zives/cis650/papers/opt-cc.pdf). We keep track
* read and write sets for each transaction, and hold off on processing the
* writes. To decide to commit a transaction we check its read sets with all
* transactions that have committed while it was running for overlaps.
* <p>
* Because transactions can span multiple regions, all regions must agree to
* commit a transactions. The client side of this commit protocol is encoded in
* org.apache.hadoop.hbase.client.transactional.TransactionManger
* <p>
* In the event of an failure of the client mid-commit, (after we voted yes), we
* will have to consult the transaction log to determine the final decision of
* the transaction. This is not yet implemented.
*/
class TransactionalRegion extends HRegion {
private static final String LEASE_TIME = "hbase.transaction.leaseTime";
private static final int DEFAULT_LEASE_TIME = 60 * 1000;
private static final int LEASE_CHECK_FREQUENCY = 1000;
private static final String OLD_TRANSACTION_FLUSH = "hbase.transaction.flush";
private static final int DEFAULT_OLD_TRANSACTION_FLUSH = 100; // Do a flush if we have this many old transactions..
private static final Log LOG = LogFactory.getLog(TransactionalRegion.class);
// Collection of active transactions (PENDING) keyed by id.
private Map<String, TransactionState> transactionsById = new HashMap<String, TransactionState>();
// Map of recent transactions that are COMMIT_PENDING or COMMITED keyed by
// their sequence number
private SortedMap<Integer, TransactionState> commitedTransactionsBySequenceNumber = Collections
.synchronizedSortedMap(new TreeMap<Integer, TransactionState>());
// Collection of transactions that are COMMIT_PENDING
private Set<TransactionState> commitPendingTransactions = Collections
.synchronizedSet(new HashSet<TransactionState>());
private final Leases transactionLeases;
private AtomicInteger nextSequenceId = new AtomicInteger(0);
private Object commitCheckLock = new Object();
private TransactionalHLogManager logManager;
private final int oldTransactionFlushTrigger;
public TransactionalRegion(final Path basedir, final HLog log,
final FileSystem fs, final HBaseConfiguration conf,
final HRegionInfo regionInfo, final FlushRequester flushListener) {
super(basedir, log, fs, conf, regionInfo, flushListener);
transactionLeases = new Leases(conf.getInt(LEASE_TIME, DEFAULT_LEASE_TIME),
LEASE_CHECK_FREQUENCY);
logManager = new TransactionalHLogManager(this);
oldTransactionFlushTrigger = conf.getInt(OLD_TRANSACTION_FLUSH, DEFAULT_OLD_TRANSACTION_FLUSH);
}
@Override
protected void doReconstructionLog(final Path oldLogFile,
final long minSeqId, final long maxSeqId, final Progressable reporter)
throws UnsupportedEncodingException, IOException {
super.doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
Map<Long, List<BatchUpdate>> commitedTransactionsById = logManager
.getCommitsFromLog(oldLogFile, minSeqId, reporter);
if (commitedTransactionsById != null && commitedTransactionsById.size() > 0) {
LOG.debug("found " + commitedTransactionsById.size()
+ " COMMITED transactions");
for (Entry<Long, List<BatchUpdate>> entry : commitedTransactionsById
.entrySet()) {
LOG.debug("Writing " + entry.getValue().size()
+ " updates for transaction " + entry.getKey());
for (BatchUpdate b : entry.getValue()) {
super.batchUpdate(b, true); // These are walled so they live forever
}
}
// LOG.debug("Flushing cache"); // We must trigger a cache flush,
// otherwise
// we will would ignore the log on subsequent failure
// if (!super.flushcache()) {
// LOG.warn("Did not flush cache");
// }
}
}
/**
* We need to make sure that we don't complete a cache flush between running
* transactions. If we did, then we would not find all log messages needed to
* restore the transaction, as some of them would be before the last
* "complete" flush id.
*/
@Override
protected long getCompleteCacheFlushSequenceId(final long currentSequenceId) {
long minPendingStartSequenceId = currentSequenceId;
for (TransactionState transactionState : transactionsById.values()) {
minPendingStartSequenceId = Math.min(minPendingStartSequenceId,
transactionState.getHLogStartSequenceId());
}
return minPendingStartSequenceId;
}
public void beginTransaction(final long transactionId) throws IOException {
String key = String.valueOf(transactionId);
if (transactionsById.get(key) != null) {
TransactionState alias = getTransactionState(transactionId);
if (alias != null) {
alias.setStatus(Status.ABORTED);
retireTransaction(alias);
}
throw new IOException("Already exiting transaction id: " + key);
}
TransactionState state = new TransactionState(transactionId, super.getLog()
.getSequenceNumber());
// Order is important here
for (TransactionState commitPending : commitPendingTransactions) {
state.addTransactionToCheck(commitPending);
}
state.setStartSequenceNumber(nextSequenceId.get());
transactionsById.put(String.valueOf(key), state);
try {
transactionLeases.createLease(key, new TransactionLeaseListener(key));
} catch (LeaseStillHeldException e) {
throw new RuntimeException(e);
}
LOG.debug("Begining transaction " + key + " in region "
+ super.getRegionInfo().getRegionNameAsString());
logManager.writeStartToLog(transactionId);
maybeTriggerOldTransactionFlush();
}
/**
* Fetch a single data item.
*
* @param transactionId
* @param row
* @param column
* @return column value
* @throws IOException
*/
public Cell get(final long transactionId, final byte[] row,
final byte[] column) throws IOException {
Cell[] results = get(transactionId, row, column, 1);
return (results == null || results.length == 0) ? null : results[0];
}
/**
* Fetch multiple versions of a single data item
*
* @param transactionId
* @param row
* @param column
* @param numVersions
* @return array of values one element per version
* @throws IOException
*/
public Cell[] get(final long transactionId, final byte[] row,
final byte[] column, final int numVersions) throws IOException {
return get(transactionId, row, column, Long.MAX_VALUE, numVersions);
}
/**
* Fetch multiple versions of a single data item, with timestamp.
*
* @param transactionId
* @param row
* @param column
* @param timestamp
* @param numVersions
* @return array of values one element per version that matches the timestamp
* @throws IOException
*/
public Cell[] get(final long transactionId, final byte[] row,
final byte[] column, final long timestamp, final int numVersions)
throws IOException {
TransactionState state = getTransactionState(transactionId);
state.addRead(row);
Cell[] localCells = state.localGet(row, column, timestamp);
if (localCells != null && localCells.length > 0) {
LOG
.trace("Transactional get of something we've written in the same transaction "
+ transactionId);
LOG.trace("row: " + Bytes.toString(row));
LOG.trace("col: " + Bytes.toString(column));
LOG.trace("numVersions: " + numVersions);
for (Cell cell : localCells) {
LOG.trace("cell: " + Bytes.toString(cell.getValue()));
}
if (numVersions > 1) {
Cell[] globalCells = get(row, column, timestamp, numVersions - 1);
Cell[] result = new Cell[globalCells.length + localCells.length];
System.arraycopy(localCells, 0, result, 0, localCells.length);
System.arraycopy(globalCells, 0, result, localCells.length,
globalCells.length);
return result;
}
return localCells;
}
return get(row, column, timestamp, numVersions);
}
/**
* Fetch all the columns for the indicated row at a specified timestamp.
* Returns a TreeMap that maps column names to values.
*
* @param transactionId
* @param row
* @param columns Array of columns you'd like to retrieve. When null, get all.
* @param ts
* @return Map<columnName, Cell> values
* @throws IOException
*/
public Map<byte[], Cell> getFull(final long transactionId, final byte[] row,
final Set<byte[]> columns, final long ts) throws IOException {
TransactionState state = getTransactionState(transactionId);
state.addRead(row);
Map<byte[], Cell> localCells = state.localGetFull(row, columns, ts);
if (localCells != null && localCells.size() > 0) {
LOG
.trace("Transactional get of something we've written in the same transaction "
+ transactionId);
LOG.trace("row: " + Bytes.toString(row));
for (Entry<byte[], Cell> entry : localCells.entrySet()) {
LOG.trace("col: " + Bytes.toString(entry.getKey()));
LOG.trace("cell: " + Bytes.toString(entry.getValue().getValue()));
}
Map<byte[], Cell> internalResults = getFull(row, columns, ts, null);
internalResults.putAll(localCells);
return internalResults;
}
return getFull(row, columns, ts, null);
}
/**
* Return an iterator that scans over the HRegion, returning the indicated
* columns for only the rows that match the data filter. This Iterator must be
* closed by the caller.
*
* @param transactionId
* @param cols columns to scan. If column name is a column family, all columns
* of the specified column family are returned. Its also possible to pass a
* regex in the column qualifier. A column qualifier is judged to be a regex
* if it contains at least one of the following characters:
* <code>\+|^&*$[]]}{)(</code>.
* @param firstRow row which is the starting point of the scan
* @param timestamp only return rows whose timestamp is <= this value
* @param filter row filter
* @return InternalScanner
* @throws IOException
*/
public InternalScanner getScanner(final long transactionId,
final byte[][] cols, final byte[] firstRow, final long timestamp,
final RowFilterInterface filter) throws IOException {
return new ScannerWrapper(transactionId, super.getScanner(cols, firstRow,
timestamp, filter));
}
/**
* Add a write to the transaction. Does not get applied until commit process.
*
* @param b
* @throws IOException
*/
public void batchUpdate(final long transactionId, final BatchUpdate b)
throws IOException {
TransactionState state = getTransactionState(transactionId);
state.addWrite(b);
logManager.writeUpdateToLog(transactionId, b);
}
/**
* Add a delete to the transaction. Does not get applied until commit process.
* FIXME, not sure about this approach
*
* @param b
* @throws IOException
*/
public void deleteAll(final long transactionId, final byte[] row,
final long timestamp) throws IOException {
TransactionState state = getTransactionState(transactionId);
long now = System.currentTimeMillis();
for (HStore store : super.stores.values()) {
List<HStoreKey> keys = store.getKeys(new HStoreKey(row, timestamp),
ALL_VERSIONS, now);
BatchUpdate deleteUpdate = new BatchUpdate(row, timestamp);
for (HStoreKey key : keys) {
deleteUpdate.delete(key.getColumn());
}
state.addWrite(deleteUpdate);
logManager.writeUpdateToLog(transactionId, deleteUpdate);
}
}
public boolean commitRequest(final long transactionId) throws IOException {
synchronized (commitCheckLock) {
TransactionState state = getTransactionState(transactionId);
if (state == null) {
return false;
}
if (hasConflict(state)) {
state.setStatus(Status.ABORTED);
retireTransaction(state);
return false;
}
// No conflicts, we can commit.
LOG.trace("No conflicts for transaction " + transactionId
+ " found in region " + super.getRegionInfo().getRegionNameAsString()
+ ". Voting for commit");
state.setStatus(Status.COMMIT_PENDING);
// If there are writes we must keep record off the transaction
if (state.getWriteSet().size() > 0) {
// Order is important
commitPendingTransactions.add(state);
state.setSequenceNumber(nextSequenceId.getAndIncrement());
commitedTransactionsBySequenceNumber.put(state.getSequenceNumber(),
state);
}
return true;
}
}
private boolean hasConflict(final TransactionState state) {
// Check transactions that were committed while we were running
for (int i = state.getStartSequenceNumber(); i < nextSequenceId.get(); i++) {
TransactionState other = commitedTransactionsBySequenceNumber.get(i);
if (other == null) {
continue;
}
state.addTransactionToCheck(other);
}
return state.hasConflict();
}
/**
* Commit the transaction.
*
* @param transactionId
* @return
* @throws IOException
*/
public void commit(final long transactionId) throws IOException {
TransactionState state;
try {
state = getTransactionState(transactionId);
} catch (UnknownTransactionException e) {
LOG.fatal("Asked to commit unknown transaction: " + transactionId
+ " in region " + super.getRegionInfo().getRegionNameAsString());
// FIXME Write to the transaction log that this transaction was corrupted
throw e;
}
if (!state.getStatus().equals(Status.COMMIT_PENDING)) {
LOG.fatal("Asked to commit a non pending transaction");
// FIXME Write to the transaction log that this transaction was corrupted
throw new IOException("commit failure");
}
commit(state);
}
/**
* Commit the transaction.
*
* @param transactionId
* @return
* @throws IOException
*/
public void abort(final long transactionId) throws IOException {
TransactionState state;
try {
state = getTransactionState(transactionId);
} catch (UnknownTransactionException e) {
LOG.error("Asked to abort unknown transaction: " + transactionId);
return;
}
state.setStatus(Status.ABORTED);
if (state.getWriteSet().size() > 0) {
logManager.writeAbortToLog(state.getTransactionId());
}
// Following removes needed if we have voted
if (state.getSequenceNumber() != null) {
commitedTransactionsBySequenceNumber.remove(state.getSequenceNumber());
}
commitPendingTransactions.remove(state);
retireTransaction(state);
}
private void commit(final TransactionState state) throws IOException {
LOG.debug("Commiting transaction: " + state.toString() + " to "
+ super.getRegionInfo().getRegionNameAsString());
if (state.getWriteSet().size() > 0) {
logManager.writeCommitToLog(state.getTransactionId());
}
for (BatchUpdate update : state.getWriteSet()) {
super.batchUpdate(update, false); // Don't need to WAL these
// FIME, maybe should be walled so we don't need to look so far back.
}
state.setStatus(Status.COMMITED);
if (state.getWriteSet().size() > 0
&& !commitPendingTransactions.remove(state)) {
LOG
.fatal("Commiting a non-query transaction that is not in commitPendingTransactions");
throw new IOException("commit failure"); // FIXME, how to handle?
}
retireTransaction(state);
}
// Cancel leases, and removed from lease lookup. This transaction may still
// live in commitedTransactionsBySequenceNumber and commitPendingTransactions
private void retireTransaction(final TransactionState state) {
String key = String.valueOf(state.getTransactionId());
try {
transactionLeases.cancelLease(key);
} catch (LeaseException e) {
// Ignore
}
transactionsById.remove(key);
}
private TransactionState getTransactionState(final long transactionId)
throws UnknownTransactionException {
String key = String.valueOf(transactionId);
TransactionState state = null;
state = transactionsById.get(key);
if (state == null) {
LOG.trace("Unknown transaction: " + key);
throw new UnknownTransactionException(key);
}
try {
transactionLeases.renewLease(key);
} catch (LeaseException e) {
throw new RuntimeException(e);
}
return state;
}
private void maybeTriggerOldTransactionFlush() {
if (commitedTransactionsBySequenceNumber.size() > oldTransactionFlushTrigger) {
removeUnNeededCommitedTransactions();
}
}
/**
* Cleanup references to committed transactions that are no longer needed.
*
*/
synchronized void removeUnNeededCommitedTransactions() {
Integer minStartSeqNumber = getMinStartSequenceNumber();
if (minStartSeqNumber == null) {
minStartSeqNumber = Integer.MAX_VALUE; // Remove all
}
int numRemoved = 0;
// Copy list to avoid conc update exception
for (Entry<Integer, TransactionState> entry : new LinkedList<Entry<Integer, TransactionState>>(
commitedTransactionsBySequenceNumber.entrySet())) {
if (entry.getKey() >= minStartSeqNumber) {
break;
}
numRemoved = numRemoved
+ (commitedTransactionsBySequenceNumber.remove(entry.getKey()) == null ? 0
: 1);
numRemoved++;
}
if (numRemoved > 0) {
LOG.debug("Removed " + numRemoved
+ " commited transactions with sequence lower than "
+ minStartSeqNumber + ". Still have "
+ commitedTransactionsBySequenceNumber.size() + " left");
} else if (commitedTransactionsBySequenceNumber.size() > 0) {
LOG.debug("Could not remove any transactions, and still have "
+ commitedTransactionsBySequenceNumber.size() + " left");
}
}
private Integer getMinStartSequenceNumber() {
Integer min = null;
for (TransactionState transactionState : transactionsById.values()) {
if (min == null || transactionState.getStartSequenceNumber() < min) {
min = transactionState.getStartSequenceNumber();
}
}
return min;
}
// TODO, resolve from the global transaction log
@SuppressWarnings("unused")
private void resolveTransactionFromLog(final long transactionId) {
throw new RuntimeException("Globaql transaction log is not Implemented");
}
private class TransactionLeaseListener implements LeaseListener {
private final String transactionName;
TransactionLeaseListener(final String n) {
this.transactionName = n;
}
/** {@inheritDoc} */
public void leaseExpired() {
LOG.info("Transaction " + this.transactionName + " lease expired");
TransactionState s = null;
synchronized (transactionsById) {
s = transactionsById.remove(transactionName);
}
if (s == null) {
LOG.warn("Unknown transaction expired " + this.transactionName);
return;
}
switch (s.getStatus()) {
case PENDING:
s.setStatus(Status.ABORTED); // Other transactions may have a ref
break;
case COMMIT_PENDING:
LOG.info("Transaction " + s.getTransactionId()
+ " expired in COMMIT_PENDING state");
LOG.info("Checking transaction status in transaction log");
resolveTransactionFromLog(s.getTransactionId());
break;
default:
LOG.warn("Unexpected status on expired lease");
}
}
}
/** Wrapper which keeps track of rows returned by scanner. */
private class ScannerWrapper implements InternalScanner {
private long transactionId;
private InternalScanner scanner;
public ScannerWrapper(final long transactionId,
final InternalScanner scanner) {
this.transactionId = transactionId;
this.scanner = scanner;
}
public void close() throws IOException {
scanner.close();
}
public boolean isMultipleMatchScanner() {
return scanner.isMultipleMatchScanner();
}
public boolean isWildcardScanner() {
return scanner.isWildcardScanner();
}
public boolean next(final HStoreKey key,
final SortedMap<byte[], Cell> results) throws IOException {
boolean result = scanner.next(key, results);
TransactionState state = getTransactionState(transactionId);
state.setHasScan(true);
// FIXME, not using row, just claiming read over the whole region. We are
// being very conservative on scans to avoid phantom reads.
state.addRead(key.getRow());
if (result) {
Map<byte[], Cell> localWrites = state.localGetFull(key.getRow(), null,
Integer.MAX_VALUE);
if (localWrites != null) {
LOG
.info("Scanning over row that has been writen to "
+ transactionId);
for (Entry<byte[], Cell> entry : localWrites.entrySet()) {
results.put(entry.getKey(), entry.getValue());
}
}
}
return result;
}
}
}

View File

@ -0,0 +1,296 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.transactional;
import java.io.IOException;
import java.lang.Thread.UncaughtExceptionHandler;
import java.util.Arrays;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HServerAddress;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.NotServingRegionException;
import org.apache.hadoop.hbase.RemoteExceptionHandler;
import org.apache.hadoop.hbase.filter.RowFilterInterface;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.io.HbaseMapWritable;
import org.apache.hadoop.hbase.io.RowResult;
import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.io.MapWritable;
import org.apache.hadoop.util.Progressable;
/**
* RegionServer with support for transactions. Transactional logic is at the
* region level, so we mostly just delegate to the appropriate
* TransactionalRegion.
*/
public class TransactionalRegionServer extends HRegionServer implements
TransactionalRegionInterface {
static final Log LOG = LogFactory.getLog(TransactionalRegionServer.class);
private final CleanOldTransactionsChore cleanOldTransactionsThread;
public TransactionalRegionServer(final HBaseConfiguration conf)
throws IOException {
this(new HServerAddress(conf.get(REGIONSERVER_ADDRESS,
DEFAULT_REGIONSERVER_ADDRESS)), conf);
}
public TransactionalRegionServer(final HServerAddress address,
final HBaseConfiguration conf) throws IOException {
super(address, conf);
cleanOldTransactionsThread = new CleanOldTransactionsChore(this,
super.stopRequested);
}
/** {@inheritDoc} */
@Override
public long getProtocolVersion(final String protocol, final long clientVersion)
throws IOException {
if (protocol.equals(TransactionalRegionInterface.class.getName())) {
return TransactionalRegionInterface.versionID;
}
return super.getProtocolVersion(protocol, clientVersion);
}
@Override
protected void init(final MapWritable c) throws IOException {
super.init(c);
String n = Thread.currentThread().getName();
UncaughtExceptionHandler handler = new UncaughtExceptionHandler() {
public void uncaughtException(final Thread t, final Throwable e) {
abort();
LOG.fatal("Set stop flag in " + t.getName(), e);
}
};
Threads.setDaemonThreadRunning(this.cleanOldTransactionsThread, n
+ ".oldTransactionCleaner", handler);
}
@Override
protected HRegion instantiateRegion(final HRegionInfo regionInfo)
throws IOException {
HRegion r = new TransactionalRegion(HTableDescriptor.getTableDir(super
.getRootDir(), regionInfo.getTableDesc().getName()), super.log, super
.getFileSystem(), super.conf, regionInfo, super.getFlushRequester());
r.initialize(null, new Progressable() {
public void progress() {
addProcessingMessage(regionInfo);
}
});
return r;
}
protected TransactionalRegion getTransactionalRegion(final byte[] regionName)
throws NotServingRegionException {
return (TransactionalRegion) super.getRegion(regionName);
}
public void abort(final byte[] regionName, final long transactionId)
throws IOException {
checkOpen();
super.getRequestCount().incrementAndGet();
try {
getTransactionalRegion(regionName).abort(transactionId);
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
public void batchUpdate(final long transactionId, final byte[] regionName,
final BatchUpdate b) throws IOException {
checkOpen();
super.getRequestCount().incrementAndGet();
try {
getTransactionalRegion(regionName).batchUpdate(transactionId, b);
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
public void commit(final byte[] regionName, final long transactionId)
throws IOException {
checkOpen();
super.getRequestCount().incrementAndGet();
try {
getTransactionalRegion(regionName).commit(transactionId);
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
public boolean commitRequest(final byte[] regionName, final long transactionId)
throws IOException {
checkOpen();
super.getRequestCount().incrementAndGet();
try {
return getTransactionalRegion(regionName).commitRequest(transactionId);
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
public Cell get(final long transactionId, final byte[] regionName,
final byte[] row, final byte[] column) throws IOException {
checkOpen();
super.getRequestCount().incrementAndGet();
try {
return getTransactionalRegion(regionName).get(transactionId, row, column);
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
public Cell[] get(final long transactionId, final byte[] regionName,
final byte[] row, final byte[] column, final int numVersions)
throws IOException {
checkOpen();
super.getRequestCount().incrementAndGet();
try {
return getTransactionalRegion(regionName).get(transactionId, row, column,
numVersions);
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
public Cell[] get(final long transactionId, final byte[] regionName,
final byte[] row, final byte[] column, final long timestamp,
final int numVersions) throws IOException {
checkOpen();
super.getRequestCount().incrementAndGet();
try {
return getTransactionalRegion(regionName).get(transactionId, row, column,
timestamp, numVersions);
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
public RowResult getRow(final long transactionId, final byte[] regionName,
final byte[] row, final long ts) throws IOException {
return getRow(transactionId, regionName, row, null, ts);
}
public RowResult getRow(final long transactionId, final byte[] regionName,
final byte[] row, final byte[][] columns) throws IOException {
return getRow(transactionId, regionName, row, columns,
HConstants.LATEST_TIMESTAMP);
}
public RowResult getRow(final long transactionId, final byte[] regionName,
final byte[] row, final byte[][] columns, final long ts)
throws IOException {
checkOpen();
super.getRequestCount().incrementAndGet();
try {
// convert the columns array into a set so it's easy to check later.
Set<byte[]> columnSet = null;
if (columns != null) {
columnSet = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
columnSet.addAll(Arrays.asList(columns));
}
TransactionalRegion region = getTransactionalRegion(regionName);
Map<byte[], Cell> map = region.getFull(transactionId, row, columnSet, ts);
HbaseMapWritable<byte[], Cell> result = new HbaseMapWritable<byte[], Cell>();
result.putAll(map);
return new RowResult(row, result);
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
/** {@inheritDoc} */
public void deleteAll(final long transactionId, final byte[] regionName,
final byte[] row, final long timestamp) throws IOException {
checkOpen();
super.getRequestCount().incrementAndGet();
try {
TransactionalRegion region = getTransactionalRegion(regionName);
region.deleteAll(transactionId, row, timestamp);
} catch (IOException e) {
checkFileSystem();
throw e;
}
}
public long openScanner(final long transactionId, final byte[] regionName,
final byte[][] cols, final byte[] firstRow, final long timestamp,
final RowFilterInterface filter) throws IOException {
checkOpen();
NullPointerException npe = null;
if (regionName == null) {
npe = new NullPointerException("regionName is null");
} else if (cols == null) {
npe = new NullPointerException("columns to scan is null");
} else if (firstRow == null) {
npe = new NullPointerException("firstRow for scanner is null");
}
if (npe != null) {
IOException io = new IOException("Invalid arguments to openScanner");
io.initCause(npe);
throw io;
}
super.getRequestCount().incrementAndGet();
try {
TransactionalRegion r = getTransactionalRegion(regionName);
long scannerId = -1L;
InternalScanner s = r.getScanner(transactionId, cols, firstRow,
timestamp, filter);
scannerId = super.addScanner(s);
return scannerId;
} catch (IOException e) {
LOG.error("Error opening scanner (fsOk: " + this.fsOk + ")",
RemoteExceptionHandler.checkIOException(e));
checkFileSystem();
throw e;
}
}
public void beginTransaction(final long transactionId, final byte[] regionName)
throws IOException {
getTransactionalRegion(regionName).beginTransaction(transactionId);
}
}

View File

@ -167,9 +167,11 @@ public abstract class HBaseTestCase extends TestCase {
protected HRegion openClosedRegion(final HRegion closedRegion)
throws IOException {
return new HRegion(closedRegion.getBaseDir(), closedRegion.getLog(),
closedRegion.getFilesystem(), closedRegion.getConf(),
closedRegion.getRegionInfo(), null, null);
HRegion r = new HRegion(closedRegion.getBaseDir(), closedRegion.getLog(),
closedRegion.getFilesystem(), closedRegion.getConf(),
closedRegion.getRegionInfo(), null);
r.initialize(null, null);
return r;
}
/**

View File

@ -0,0 +1,421 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.transactional;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.atomic.AtomicBoolean;
import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.HBaseClusterTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Stress Test the transaction functionality. This requires to run an
* {@link TransactionalRegionServer}. We run many threads doing reads/writes
* which may conflict with each other. We have two types of transactions, those
* which operate on rows of a single table, and those which operate on rows
* across multiple tables. Each transaction type has a modification operation
* which changes two values while maintaining the sum. Also each transaction
* type has a consistency-check operation which sums all rows and verifies that
* the sum is as expected.
*/
public class StressTestTransactions extends HBaseClusterTestCase {
private static final Log LOG = LogFactory
.getLog(StressTestTransactions.class);
private static final int NUM_TABLES = 3;
private static final int NUM_ST_ROWS = 3;
private static final int NUM_MT_ROWS = 3;
private static final int NUM_TRANSACTIONS_PER_THREAD = 100;
private static final int NUM_SINGLE_TABLE_THREADS = 6;
private static final int NUM_MULTI_TABLE_THREADS = 6;
private static final int PRE_COMMIT_SLEEP = 10;
private static final Random RAND = new Random();
private static final byte[] FAMILY = Bytes.toBytes("family:");
private static final byte[] COL = Bytes.toBytes("family:a");
private HBaseAdmin admin;
private TransactionalTable[] tables;
private TransactionManager transactionManager;
/** constructor */
public StressTestTransactions() {
conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
.getName());
conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
.getName());
}
@Override
protected void setUp() throws Exception {
super.setUp();
tables = new TransactionalTable[NUM_TABLES];
for (int i = 0; i < tables.length; i++) {
HTableDescriptor desc = new HTableDescriptor(makeTableName(i));
desc.addFamily(new HColumnDescriptor(FAMILY));
admin = new HBaseAdmin(conf);
admin.createTable(desc);
tables[i] = new TransactionalTable(conf, desc.getName());
}
transactionManager = new TransactionManager(conf);
}
private String makeTableName(final int i) {
return "table" + i;
}
private void writeInitalValues() throws IOException {
for (TransactionalTable table : tables) {
for (int i = 0; i < NUM_ST_ROWS; i++) {
byte[] row = makeSTRow(i);
BatchUpdate b = new BatchUpdate(row);
b.put(COL, Bytes.toBytes(SingleTableTransactionThread.INITIAL_VALUE));
table.commit(b);
}
for (int i = 0; i < NUM_MT_ROWS; i++) {
byte[] row = makeMTRow(i);
BatchUpdate b = new BatchUpdate(row);
b.put(COL, Bytes.toBytes(MultiTableTransactionThread.INITIAL_VALUE));
table.commit(b);
}
}
}
private byte[] makeSTRow(final int i) {
return Bytes.toBytes("st" + i);
}
private byte[] makeMTRow(final int i) {
return Bytes.toBytes("mt" + i);
}
private static int nextThreadNum = 1;
private static final AtomicBoolean stopRequest = new AtomicBoolean(false);
private static final AtomicBoolean consistencyFailure = new AtomicBoolean(
false);
// Thread which runs transactions
abstract class TransactionThread extends Thread {
private int numRuns = 0;
private int numAborts = 0;
private int numUnknowns = 0;
public TransactionThread(final String namePrefix) {
super.setName(namePrefix + "transaction " + nextThreadNum++);
}
@Override
public void run() {
for (int i = 0; i < NUM_TRANSACTIONS_PER_THREAD; i++) {
if (stopRequest.get()) {
return;
}
try {
numRuns++;
transaction();
} catch (UnknownTransactionException e) {
numUnknowns++;
} catch (IOException e) {
throw new RuntimeException(e);
} catch (CommitUnsuccessfulException e) {
numAborts++;
}
}
}
protected abstract void transaction() throws IOException,
CommitUnsuccessfulException;
public int getNumAborts() {
return numAborts;
}
public int getNumUnknowns() {
return numUnknowns;
}
protected void preCommitSleep() {
try {
Thread.sleep(PRE_COMMIT_SLEEP);
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
protected void consistencyFailure() {
LOG.fatal("Consistency failure");
stopRequest.set(true);
consistencyFailure.set(true);
}
/**
* Get the numRuns.
*
* @return Return the numRuns.
*/
public int getNumRuns() {
return numRuns;
}
}
// Atomically change the value of two rows rows while maintaining the sum.
// This should preserve the global sum of the rows, which is also checked
// with a transaction.
private class SingleTableTransactionThread extends TransactionThread {
private static final int INITIAL_VALUE = 10;
public static final int TOTAL_SUM = INITIAL_VALUE * NUM_ST_ROWS;
private static final int MAX_TRANSFER_AMT = 100;
private TransactionalTable table;
boolean doCheck = false;
public SingleTableTransactionThread() {
super("single table ");
}
@Override
protected void transaction() throws IOException,
CommitUnsuccessfulException {
if (doCheck) {
checkTotalSum();
} else {
doSingleRowChange();
}
doCheck = !doCheck;
}
private void doSingleRowChange() throws IOException,
CommitUnsuccessfulException {
table = tables[RAND.nextInt(NUM_TABLES)];
int transferAmount = RAND.nextInt(MAX_TRANSFER_AMT * 2)
- MAX_TRANSFER_AMT;
int row1Index = RAND.nextInt(NUM_ST_ROWS);
int row2Index;
do {
row2Index = RAND.nextInt(NUM_ST_ROWS);
} while (row2Index == row1Index);
byte[] row1 = makeSTRow(row1Index);
byte[] row2 = makeSTRow(row2Index);
TransactionState transactionState = transactionManager.beginTransaction();
int row1Amount = Bytes.toInt(table.get(transactionState, row1, COL)
.getValue());
int row2Amount = Bytes.toInt(table.get(transactionState, row2, COL)
.getValue());
row1Amount -= transferAmount;
row2Amount += transferAmount;
BatchUpdate update = new BatchUpdate(row1);
update.put(COL, Bytes.toBytes(row1Amount));
table.commit(transactionState, update);
update = new BatchUpdate(row2);
update.put(COL, Bytes.toBytes(row2Amount));
table.commit(transactionState, update);
super.preCommitSleep();
transactionManager.tryCommit(transactionState);
LOG.debug("Commited");
}
// Check the table we last mutated
private void checkTotalSum() throws IOException,
CommitUnsuccessfulException {
TransactionState transactionState = transactionManager.beginTransaction();
int totalSum = 0;
for (int i = 0; i < NUM_ST_ROWS; i++) {
totalSum += Bytes.toInt(table.get(transactionState, makeSTRow(i), COL)
.getValue());
}
transactionManager.tryCommit(transactionState);
if (TOTAL_SUM != totalSum) {
super.consistencyFailure();
}
}
}
// Similar to SingleTable, but this time we maintain consistency across tables
// rather than rows
private class MultiTableTransactionThread extends TransactionThread {
private static final int INITIAL_VALUE = 1000;
public static final int TOTAL_SUM = INITIAL_VALUE * NUM_TABLES;
private static final int MAX_TRANSFER_AMT = 100;
private byte[] row;
boolean doCheck = false;
public MultiTableTransactionThread() {
super("multi table");
}
@Override
protected void transaction() throws IOException,
CommitUnsuccessfulException {
if (doCheck) {
checkTotalSum();
} else {
doSingleRowChange();
}
doCheck = !doCheck;
}
private void doSingleRowChange() throws IOException,
CommitUnsuccessfulException {
row = makeMTRow(RAND.nextInt(NUM_MT_ROWS));
int transferAmount = RAND.nextInt(MAX_TRANSFER_AMT * 2)
- MAX_TRANSFER_AMT;
int table1Index = RAND.nextInt(tables.length);
int table2Index;
do {
table2Index = RAND.nextInt(tables.length);
} while (table2Index == table1Index);
TransactionalTable table1 = tables[table1Index];
TransactionalTable table2 = tables[table2Index];
TransactionState transactionState = transactionManager.beginTransaction();
int table1Amount = Bytes.toInt(table1.get(transactionState, row, COL)
.getValue());
int table2Amount = Bytes.toInt(table2.get(transactionState, row, COL)
.getValue());
table1Amount -= transferAmount;
table2Amount += transferAmount;
BatchUpdate update = new BatchUpdate(row);
update.put(COL, Bytes.toBytes(table1Amount));
table1.commit(transactionState, update);
update = new BatchUpdate(row);
update.put(COL, Bytes.toBytes(table2Amount));
table2.commit(transactionState, update);
super.preCommitSleep();
transactionManager.tryCommit(transactionState);
LOG.trace(Bytes.toString(table1.getTableName()) + ": " + table1Amount);
LOG.trace(Bytes.toString(table2.getTableName()) + ": " + table2Amount);
}
private void checkTotalSum() throws IOException,
CommitUnsuccessfulException {
TransactionState transactionState = transactionManager.beginTransaction();
int totalSum = 0;
int[] amounts = new int[tables.length];
for (int i = 0; i < tables.length; i++) {
int amount = Bytes.toInt(tables[i].get(transactionState, row, COL)
.getValue());
amounts[i] = amount;
totalSum += amount;
}
transactionManager.tryCommit(transactionState);
for (int i = 0; i < tables.length; i++) {
LOG.trace(Bytes.toString(tables[i].getTableName()) + ": " + amounts[i]);
}
if (TOTAL_SUM != totalSum) {
super.consistencyFailure();
}
}
}
public void testStressTransactions() throws IOException, InterruptedException {
writeInitalValues();
List<TransactionThread> transactionThreads = new LinkedList<TransactionThread>();
for (int i = 0; i < NUM_SINGLE_TABLE_THREADS; i++) {
TransactionThread transactionThread = new SingleTableTransactionThread();
transactionThread.start();
transactionThreads.add(transactionThread);
}
for (int i = 0; i < NUM_MULTI_TABLE_THREADS; i++) {
TransactionThread transactionThread = new MultiTableTransactionThread();
transactionThread.start();
transactionThreads.add(transactionThread);
}
for (TransactionThread transactionThread : transactionThreads) {
transactionThread.join();
}
for (TransactionThread transactionThread : transactionThreads) {
LOG.info(transactionThread.getName() + " done with "
+ transactionThread.getNumAborts() + " aborts, and "
+ transactionThread.getNumUnknowns() + " unknown transactions of "
+ transactionThread.getNumRuns());
}
doFinalConsistencyChecks();
}
private void doFinalConsistencyChecks() throws IOException {
int[] mtSums = new int[NUM_MT_ROWS];
for (int i = 0; i < mtSums.length; i++) {
mtSums[i] = 0;
}
for (TransactionalTable table : tables) {
int thisTableSum = 0;
for (int i = 0; i < NUM_ST_ROWS; i++) {
byte[] row = makeSTRow(i);
thisTableSum += Bytes.toInt(table.get(row, COL).getValue());
}
Assert.assertEquals(SingleTableTransactionThread.TOTAL_SUM, thisTableSum);
for (int i = 0; i < NUM_MT_ROWS; i++) {
byte[] row = makeMTRow(i);
mtSums[i] += Bytes.toInt(table.get(row, COL).getValue());
}
}
for (int mtSum : mtSums) {
Assert.assertEquals(MultiTableTransactionThread.TOTAL_SUM, mtSum);
}
}
}

View File

@ -0,0 +1,143 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client.transactional;
import java.io.IOException;
import org.apache.hadoop.hbase.HBaseClusterTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.io.Cell;
import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
import org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
/**
* Test the transaction functionality. This requires to run an
* {@link TransactionalRegionServer}.
*/
public class TestTransactions extends HBaseClusterTestCase {
private static final String TABLE_NAME = "table1";
private static final byte[] FAMILY = Bytes.toBytes("family:");
private static final byte[] COL_A = Bytes.toBytes("family:a");
private static final byte[] ROW1 = Bytes.toBytes("row1");
private static final byte[] ROW2 = Bytes.toBytes("row2");
private static final byte[] ROW3 = Bytes.toBytes("row3");
private HBaseAdmin admin;
private TransactionalTable table;
private TransactionManager transactionManager;
/** constructor */
public TestTransactions() {
conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
.getName());
conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
.getName());
}
@Override
protected void setUp() throws Exception {
super.setUp();
HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
desc.addFamily(new HColumnDescriptor(FAMILY));
admin = new HBaseAdmin(conf);
admin.createTable(desc);
table = new TransactionalTable(conf, desc.getName());
transactionManager = new TransactionManager(conf);
writeInitalRow();
}
private void writeInitalRow() throws IOException {
BatchUpdate update = new BatchUpdate(ROW1);
update.put(COL_A, Bytes.toBytes(1));
table.commit(update);
}
public void testSimpleTransaction() throws IOException,
CommitUnsuccessfulException {
TransactionState transactionState = makeTransaction1();
transactionManager.tryCommit(transactionState);
}
public void testTwoTransactionsWithoutConflict() throws IOException,
CommitUnsuccessfulException {
TransactionState transactionState1 = makeTransaction1();
TransactionState transactionState2 = makeTransaction2();
transactionManager.tryCommit(transactionState1);
transactionManager.tryCommit(transactionState2);
}
public void testTwoTransactionsWithConflict() throws IOException,
CommitUnsuccessfulException {
TransactionState transactionState1 = makeTransaction1();
TransactionState transactionState2 = makeTransaction2();
transactionManager.tryCommit(transactionState2);
try {
transactionManager.tryCommit(transactionState1);
fail();
} catch (CommitUnsuccessfulException e) {
// Good
}
}
// Read from ROW1,COL_A and put it in ROW2_COLA and ROW3_COLA
private TransactionState makeTransaction1() throws IOException {
TransactionState transactionState = transactionManager.beginTransaction();
Cell row1_A = table.get(transactionState, ROW1, COL_A);
BatchUpdate write1 = new BatchUpdate(ROW2);
write1.put(COL_A, row1_A.getValue());
table.commit(transactionState, write1);
BatchUpdate write2 = new BatchUpdate(ROW3);
write2.put(COL_A, row1_A.getValue());
table.commit(transactionState, write2);
return transactionState;
}
// Read ROW1,COL_A, increment its (integer) value, write back
private TransactionState makeTransaction2() throws IOException {
TransactionState transactionState = transactionManager.beginTransaction();
Cell row1_A = table.get(transactionState, ROW1, COL_A);
int value = Bytes.toInt(row1_A.getValue());
BatchUpdate write = new BatchUpdate(ROW1);
write.put(COL_A, Bytes.toBytes(value + 1));
table.commit(transactionState, write);
return transactionState;
}
}

View File

@ -0,0 +1,285 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.transactional;
import java.io.IOException;
import java.util.Collection;
import java.util.List;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseClusterTestCase;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.LocalHBaseCluster;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.client.Scanner;
import org.apache.hadoop.hbase.client.transactional.CommitUnsuccessfulException;
import org.apache.hadoop.hbase.client.transactional.TransactionManager;
import org.apache.hadoop.hbase.client.transactional.TransactionState;
import org.apache.hadoop.hbase.client.transactional.TransactionalTable;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.HRegionServer;
import org.apache.hadoop.hbase.util.Bytes;
public class TestHLogRecovery extends HBaseClusterTestCase {
private static final Log LOG = LogFactory.getLog(TestHLogRecovery.class);
private static final String TABLE_NAME = "table1";
private static final byte[] FAMILY = Bytes.toBytes("family:");
private static final byte[] COL_A = Bytes.toBytes("family:a");
private static final byte[] ROW1 = Bytes.toBytes("row1");
private static final byte[] ROW2 = Bytes.toBytes("row2");
private static final byte[] ROW3 = Bytes.toBytes("row3");
private static final int TOTAL_VALUE = 10;
private HBaseAdmin admin;
private TransactionManager transactionManager;
private TransactionalTable table;
/** constructor */
public TestHLogRecovery() {
super(2, false);
conf.set(HConstants.REGION_SERVER_CLASS, TransactionalRegionInterface.class
.getName());
conf.set(HConstants.REGION_SERVER_IMPL, TransactionalRegionServer.class
.getName());
// Set flush params so we don't get any
// FIXME (defaults are probably fine)
// Copied from TestRegionServerExit
conf.setInt("ipc.client.connect.max.retries", 5); // reduce ipc retries
conf.setInt("ipc.client.timeout", 10000); // and ipc timeout
conf.setInt("hbase.client.pause", 10000); // increase client timeout
conf.setInt("hbase.client.retries.number", 10); // increase HBase retries
}
@Override
protected void setUp() throws Exception {
FileSystem.getLocal(conf).delete(new Path(conf.get(HConstants.HBASE_DIR)), true);
super.setUp();
HTableDescriptor desc = new HTableDescriptor(TABLE_NAME);
desc.addFamily(new HColumnDescriptor(FAMILY));
admin = new HBaseAdmin(conf);
admin.createTable(desc);
table = new TransactionalTable(conf, desc.getName());
transactionManager = new TransactionManager(conf);
writeInitalRows();
}
private void writeInitalRows() throws IOException {
BatchUpdate update = new BatchUpdate(ROW1);
update.put(COL_A, Bytes.toBytes(TOTAL_VALUE));
table.commit(update);
update = new BatchUpdate(ROW2);
update.put(COL_A, Bytes.toBytes(0));
table.commit(update);
update = new BatchUpdate(ROW3);
update.put(COL_A, Bytes.toBytes(0));
table.commit(update);
}
public void testWithoutFlush() throws IOException,
CommitUnsuccessfulException {
writeInitalRows();
TransactionState state1 = makeTransaction(false);
transactionManager.tryCommit(state1);
stopOrAbortRegionServer(true);
Thread t = startVerificationThread(1);
t.start();
threadDumpingJoin(t);
}
public void testWithFlushBeforeCommit() throws IOException,
CommitUnsuccessfulException {
writeInitalRows();
TransactionState state1 = makeTransaction(false);
flushRegionServer();
transactionManager.tryCommit(state1);
stopOrAbortRegionServer(true);
Thread t = startVerificationThread(1);
t.start();
threadDumpingJoin(t);
}
// FIXME, TODO
// public void testWithFlushBetweenTransactionWrites() {
// fail();
// }
private void flushRegionServer() {
List<LocalHBaseCluster.RegionServerThread> regionThreads = cluster
.getRegionThreads();
HRegion region = null;
int server = -1;
for (int i = 0; i < regionThreads.size() && server == -1; i++) {
HRegionServer s = regionThreads.get(i).getRegionServer();
Collection<HRegion> regions = s.getOnlineRegions();
for (HRegion r : regions) {
if (Bytes.equals(r.getTableDesc().getName(), Bytes.toBytes(TABLE_NAME))) {
server = i;
region = r;
}
}
}
if (server == -1) {
LOG.fatal("could not find region server serving table region");
fail();
}
((TransactionalRegionServer) regionThreads.get(server).getRegionServer())
.getFlushRequester().request(region);
}
/**
* Stop the region server serving TABLE_NAME.
*
* @param abort set to true if region server should be aborted, if false it is
* just shut down.
*/
private void stopOrAbortRegionServer(final boolean abort) {
List<LocalHBaseCluster.RegionServerThread> regionThreads = cluster
.getRegionThreads();
int server = -1;
for (int i = 0; i < regionThreads.size(); i++) {
HRegionServer s = regionThreads.get(i).getRegionServer();
Collection<HRegion> regions = s.getOnlineRegions();
LOG.info("server: " + regionThreads.get(i).getName());
for (HRegion r : regions) {
LOG.info("region: " + r.getRegionInfo().getRegionNameAsString());
if (Bytes.equals(r.getTableDesc().getName(), Bytes.toBytes(TABLE_NAME))) {
server = i;
}
}
}
if (server == -1) {
LOG.fatal("could not find region server serving table region");
fail();
}
if (abort) {
this.cluster.abortRegionServer(server);
} else {
this.cluster.stopRegionServer(server);
}
LOG.info(this.cluster.waitOnRegionServer(server) + " has been "
+ (abort ? "aborted" : "shut down"));
}
private void verify(final int numRuns) throws IOException {
// Reads
int row1 = Bytes.toInt(table.get(ROW1, COL_A).getValue());
int row2 = Bytes.toInt(table.get(ROW2, COL_A).getValue());
int row3 = Bytes.toInt(table.get(ROW3, COL_A).getValue());
assertEquals(TOTAL_VALUE - 2 * numRuns, row1);
assertEquals(numRuns, row2);
assertEquals(numRuns, row3);
}
// Move 2 out of ROW1 and 1 into ROW2 and 1 into ROW3
private TransactionState makeTransaction(final boolean flushMidWay)
throws IOException {
TransactionState transactionState = transactionManager.beginTransaction();
// Reads
int row1 = Bytes.toInt(table.get(transactionState, ROW1, COL_A).getValue());
int row2 = Bytes.toInt(table.get(transactionState, ROW2, COL_A).getValue());
int row3 = Bytes.toInt(table.get(transactionState, ROW3, COL_A).getValue());
row1 -= 2;
row2 += 1;
row3 += 1;
if (flushMidWay) {
flushRegionServer();
}
// Writes
BatchUpdate write = new BatchUpdate(ROW1);
write.put(COL_A, Bytes.toBytes(row1));
table.commit(transactionState, write);
write = new BatchUpdate(ROW2);
write.put(COL_A, Bytes.toBytes(row2));
table.commit(transactionState, write);
write = new BatchUpdate(ROW3);
write.put(COL_A, Bytes.toBytes(row3));
table.commit(transactionState, write);
return transactionState;
}
/*
* Run verification in a thread so I can concurrently run a thread-dumper
* while we're waiting (because in this test sometimes the meta scanner looks
* to be be stuck). @param tableName Name of table to find. @param row Row we
* expect to find. @return Verification thread. Caller needs to calls start on
* it.
*/
private Thread startVerificationThread(final int numRuns) {
Runnable runnable = new Runnable() {
public void run() {
try {
// Now try to open a scanner on the meta table. Should stall until
// meta server comes back up.
HTable t = new HTable(conf, TABLE_NAME);
Scanner s = t.getScanner(new byte[][] { COL_A },
HConstants.EMPTY_START_ROW);
s.close();
} catch (IOException e) {
LOG.fatal("could not re-open meta table because", e);
fail();
}
Scanner scanner = null;
try {
verify(numRuns);
LOG.info("Success!");
} catch (Exception e) {
e.printStackTrace();
fail();
} finally {
if (scanner != null) {
LOG.info("Closing scanner " + scanner);
scanner.close();
}
}
}
};
return new Thread(runnable);
}
}

View File

@ -0,0 +1,310 @@
/**
* Copyright 2008 The Apache Software Foundation
*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.transactional;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.dfs.MiniDFSCluster;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseTestCase;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.io.BatchUpdate;
import org.apache.hadoop.hbase.regionserver.HLog;
import org.apache.hadoop.hbase.util.Bytes;
/** JUnit test case for HLog */
public class TestTransactionalHLogManager extends HBaseTestCase implements
HConstants {
private Path dir;
private MiniDFSCluster cluster;
final byte[] tableName = Bytes.toBytes("tablename");
final HTableDescriptor tableDesc = new HTableDescriptor(tableName);
final HRegionInfo regionInfo = new HRegionInfo(tableDesc,
HConstants.EMPTY_START_ROW, HConstants.EMPTY_END_ROW);
final byte[] row1 = Bytes.toBytes("row1");
final byte[] val1 = Bytes.toBytes("val1");
final byte[] row2 = Bytes.toBytes("row2");
final byte[] val2 = Bytes.toBytes("val2");
final byte[] row3 = Bytes.toBytes("row3");
final byte[] val3 = Bytes.toBytes("val3");
final byte[] col = Bytes.toBytes("col:A");
/** {@inheritDoc} */
@Override
public void setUp() throws Exception {
cluster = new MiniDFSCluster(conf, 2, true, (String[]) null);
// Set the hbase.rootdir to be the home directory in mini dfs.
this.conf.set(HConstants.HBASE_DIR, this.cluster.getFileSystem()
.getHomeDirectory().toString());
super.setUp();
this.dir = new Path("/hbase", getName());
if (fs.exists(dir)) {
fs.delete(dir, true);
}
}
/** {@inheritDoc} */
@Override
public void tearDown() throws Exception {
if (this.fs.exists(this.dir)) {
this.fs.delete(this.dir, true);
}
shutdownDfs(cluster);
super.tearDown();
}
/**
* @throws IOException
*/
public void testSingleCommit() throws IOException {
HLog log = new HLog(fs, dir, this.conf, null);
TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs,
regionInfo, conf);
// Write columns named 1, 2, 3, etc. and then values of single byte
// 1, 2, 3...
long transactionId = 1;
logMangaer.writeStartToLog(transactionId);
BatchUpdate update1 = new BatchUpdate(row1);
update1.put(col, val1);
logMangaer.writeUpdateToLog(transactionId, update1);
BatchUpdate update2 = new BatchUpdate(row2);
update2.put(col, val2);
logMangaer.writeUpdateToLog(transactionId, update2);
BatchUpdate update3 = new BatchUpdate(row3);
update3.put(col, val3);
logMangaer.writeUpdateToLog(transactionId, update3);
logMangaer.writeCommitToLog(transactionId);
// log.completeCacheFlush(regionName, tableName, logSeqId);
log.close();
Path filename = log.computeFilename(log.getFilenum());
Map<Long, List<BatchUpdate>> commits = logMangaer.getCommitsFromLog(
filename, -1, null);
assertEquals(1, commits.size());
assertTrue(commits.containsKey(transactionId));
assertEquals(3, commits.get(transactionId).size());
List<BatchUpdate> updates = commits.get(transactionId);
update1 = updates.get(0);
assertTrue(Bytes.equals(row1, update1.getRow()));
assertTrue(Bytes.equals(val1, update1.iterator().next().getValue()));
update2 = updates.get(1);
assertTrue(Bytes.equals(row2, update2.getRow()));
assertTrue(Bytes.equals(val2, update2.iterator().next().getValue()));
update3 = updates.get(2);
assertTrue(Bytes.equals(row3, update3.getRow()));
assertTrue(Bytes.equals(val3, update3.iterator().next().getValue()));
}
/**
* @throws IOException
*/
public void testSingleAbort() throws IOException {
HLog log = new HLog(fs, dir, this.conf, null);
TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs,
regionInfo, conf);
long transactionId = 1;
logMangaer.writeStartToLog(transactionId);
BatchUpdate update1 = new BatchUpdate(row1);
update1.put(col, val1);
logMangaer.writeUpdateToLog(transactionId, update1);
BatchUpdate update2 = new BatchUpdate(row2);
update2.put(col, val2);
logMangaer.writeUpdateToLog(transactionId, update2);
BatchUpdate update3 = new BatchUpdate(row3);
update3.put(col, val3);
logMangaer.writeUpdateToLog(transactionId, update3);
logMangaer.writeAbortToLog(transactionId);
// log.completeCacheFlush(regionName, tableName, logSeqId);
log.close();
Path filename = log.computeFilename(log.getFilenum());
Map<Long, List<BatchUpdate>> commits = logMangaer.getCommitsFromLog(
filename, -1, null);
assertEquals(0, commits.size());
}
/**
* @throws IOException
*/
public void testInterlievedCommits() throws IOException {
HLog log = new HLog(fs, dir, this.conf, null);
TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs,
regionInfo, conf);
long transaction1Id = 1;
long transaction2Id = 2;
logMangaer.writeStartToLog(transaction1Id);
BatchUpdate update1 = new BatchUpdate(row1);
update1.put(col, val1);
logMangaer.writeUpdateToLog(transaction1Id, update1);
logMangaer.writeStartToLog(transaction2Id);
BatchUpdate update2 = new BatchUpdate(row2);
update2.put(col, val2);
logMangaer.writeUpdateToLog(transaction2Id, update2);
BatchUpdate update3 = new BatchUpdate(row3);
update3.put(col, val3);
logMangaer.writeUpdateToLog(transaction1Id, update3);
logMangaer.writeCommitToLog(transaction2Id);
logMangaer.writeCommitToLog(transaction1Id);
// log.completeCacheFlush(regionName, tableName, logSeqId);
log.close();
Path filename = log.computeFilename(log.getFilenum());
Map<Long, List<BatchUpdate>> commits = logMangaer.getCommitsFromLog(
filename, -1, null);
assertEquals(2, commits.size());
assertEquals(2, commits.get(transaction1Id).size());
assertEquals(1, commits.get(transaction2Id).size());
}
/**
* @throws IOException
*/
public void testInterlievedAbortCommit() throws IOException {
HLog log = new HLog(fs, dir, this.conf, null);
TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs,
regionInfo, conf);
long transaction1Id = 1;
long transaction2Id = 2;
logMangaer.writeStartToLog(transaction1Id);
BatchUpdate update1 = new BatchUpdate(row1);
update1.put(col, val1);
logMangaer.writeUpdateToLog(transaction1Id, update1);
logMangaer.writeStartToLog(transaction2Id);
BatchUpdate update2 = new BatchUpdate(row2);
update2.put(col, val2);
logMangaer.writeUpdateToLog(transaction2Id, update2);
logMangaer.writeAbortToLog(transaction2Id);
BatchUpdate update3 = new BatchUpdate(row3);
update3.put(col, val3);
logMangaer.writeUpdateToLog(transaction1Id, update3);
logMangaer.writeCommitToLog(transaction1Id);
// log.completeCacheFlush(regionName, tableName, logSeqId);
log.close();
Path filename = log.computeFilename(log.getFilenum());
Map<Long, List<BatchUpdate>> commits = logMangaer.getCommitsFromLog(
filename, -1, null);
assertEquals(1, commits.size());
assertEquals(2, commits.get(transaction1Id).size());
}
/**
* @throws IOException
*/
public void testInterlievedCommitAbort() throws IOException {
HLog log = new HLog(fs, dir, this.conf, null);
TransactionalHLogManager logMangaer = new TransactionalHLogManager(log, fs,
regionInfo, conf);
long transaction1Id = 1;
long transaction2Id = 2;
logMangaer.writeStartToLog(transaction1Id);
BatchUpdate update1 = new BatchUpdate(row1);
update1.put(col, val1);
logMangaer.writeUpdateToLog(transaction1Id, update1);
logMangaer.writeStartToLog(transaction2Id);
BatchUpdate update2 = new BatchUpdate(row2);
update2.put(col, val2);
logMangaer.writeUpdateToLog(transaction2Id, update2);
logMangaer.writeCommitToLog(transaction2Id);
BatchUpdate update3 = new BatchUpdate(row3);
update3.put(col, val3);
logMangaer.writeUpdateToLog(transaction1Id, update3);
logMangaer.writeAbortToLog(transaction1Id);
// log.completeCacheFlush(regionName, tableName, logSeqId);
log.close();
Path filename = log.computeFilename(log.getFilenum());
Map<Long, List<BatchUpdate>> commits = logMangaer.getCommitsFromLog(
filename, -1, null);
assertEquals(1, commits.size());
assertEquals(1, commits.get(transaction2Id).size());
}
// FIXME Cannot do this test without a global transacton manager
// public void testMissingCommit() {
// fail();
// }
// FIXME Cannot do this test without a global transacton manager
// public void testMissingAbort() {
// fail();
// }
}