HBASE-1848 Master can't split logs written by THBase

git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@819085 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Kyle Purtell 2009-09-26 01:26:55 +00:00
parent f51a5545b5
commit a95bbc96e4
12 changed files with 121 additions and 71 deletions

View File

@ -49,6 +49,8 @@ Release 0.21.0 - Unreleased
cacheBlocks
HBASE-1869 IndexedTable delete fails when used in conjunction with RowLock()
(Keith Thomas via Stack)
HBASE-1858 Master can't split logs created by THBase (Clint Morgan via
Andrew Purtell)
IMPROVEMENTS
HBASE-1760 Cleanup TODOs in HTable

View File

@ -24,6 +24,7 @@ import java.util.Random;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Get;

View File

@ -44,16 +44,22 @@ public interface TransactionLogger {
*/
long createNewTransactionLog();
/**
/** Get the status of a transaction.
* @param transactionId
* @return transaction status
*/
TransactionStatus getStatusForTransaction(long transactionId);
/**
/** Set the status for a transaction.
* @param transactionId
* @param status
*/
void setStatusForTransaction(long transactionId, TransactionStatus status);
/** This transaction's state is no longer needed.
*
* @param transactionId
*/
void forgetTransaction(long transactionId);
}

View File

@ -152,6 +152,8 @@ public class TransactionManager {
if (status == TransactionalRegionInterface.COMMIT_OK) {
doCommit(transactionState);
} else if (status == TransactionalRegionInterface.COMMIT_OK_READ_ONLY) {
transactionLogger.forgetTransaction(transactionState.getTransactionId());
}
LOG.debug("Committed transaction ["+transactionState.getTransactionId()+"] in ["+((System.currentTimeMillis()-startTime))+"]ms");
}
@ -187,7 +189,7 @@ public class TransactionManager {
}
throw new CommitUnsuccessfulException(e);
}
// TODO: Transaction log can be deleted now ...
transactionLogger.forgetTransaction(transactionState.getTransactionId());
}
/**
@ -230,6 +232,7 @@ public class TransactionManager {
+ "]. Ignoring.");
}
}
transactionLogger.forgetTransaction(transactionState.getTransactionId());
}
public synchronized JtaXAResource getXAResource() {

View File

@ -43,6 +43,10 @@ TransactionalRegionServer. This is done by setting
<i>org.apache.hadoop.hbase.ipc.TransactionalRegionInterface</i> and
<i>hbase.regionserver.impl </i> to
<i>org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer</i>
Additionally, to properly recover from the write-ahead-log, the transactional log
key class must be registered by setting <i>hbase.regionserver.hlog.keyclass</i>
to <i>org.apache.hadoop.hbase.regionserver.transactional.THLogKey</i>
<p>
The read set claimed by a transactional scanner is determined from the start and

View File

@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.regionserver.transactional;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HRegionInfo;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
import org.apache.hadoop.hbase.client.transactional.TransactionLogger;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.io.SequenceFile;
@ -74,6 +75,10 @@ class THLogRecoveryManager {
/**
* Go through the WAL, and look for transactions that were started, but never
* completed. If the transaction was committed, then those edits will need to
* be applied.
*
* @param reconstructionLog
* @param maxSeqID
* @param reporter
@ -98,7 +103,7 @@ class THLogRecoveryManager {
}
SortedMap<Long, List<KeyValue>> pendingTransactionsById = new TreeMap<Long, List<KeyValue>>();
SortedMap<Long, List<KeyValue>> commitedTransactionsById = new TreeMap<Long, List<KeyValue>>();
Set<Long> commitedTransactions = new HashSet<Long>();
Set<Long> abortedTransactions = new HashSet<Long>();
SequenceFile.Reader logReader = new SequenceFile.Reader(fileSystem,
@ -118,8 +123,10 @@ class THLogRecoveryManager {
2000);
while (logReader.next(key, val)) {
LOG.debug("Processing edit: key: " + key.toString() + " val: "
+ val.toString());
if (LOG.isTraceEnabled()) {
LOG.trace("Processing edit: key: " + key.toString() + " val: "
+ val.toString());
}
if (key.getLogSeqNum() < maxSeqID) {
skippedEdits++;
continue;
@ -135,12 +142,12 @@ class THLogRecoveryManager {
case START:
if (updates != null || abortedTransactions.contains(transactionId)
|| commitedTransactionsById.containsKey(transactionId)) {
|| commitedTransactions.contains(transactionId)) {
LOG.error("Processing start for transaction: " + transactionId
+ ", but have already seen start message");
throw new IOException("Corrupted transaction log");
}
updates = new LinkedList<KeyValue>();
updates = new ArrayList<KeyValue>();
pendingTransactionsById.put(transactionId, updates);
startCount++;
break;
@ -179,18 +186,13 @@ class THLogRecoveryManager {
+ ", but also have abort message");
throw new IOException("Corrupted transaction log");
}
if (updates.size() == 0) {
LOG
.warn("Transaciton " + transactionId
+ " has no writes in log. ");
}
if (commitedTransactionsById.containsKey(transactionId)) {
if (commitedTransactions.contains(transactionId)) {
LOG.error("Processing commit for transaction: " + transactionId
+ ", but have already commited transaction with that id");
throw new IOException("Corrupted transaction log");
}
pendingTransactionsById.remove(transactionId);
commitedTransactionsById.put(transactionId, updates);
commitedTransactions.add(transactionId);
commitCount++;
break;
default:
@ -213,23 +215,28 @@ class THLogRecoveryManager {
}
if (pendingTransactionsById.size() > 0) {
resolvePendingTransaction(pendingTransactionsById, commitedTransactionsById);
return resolvePendingTransaction(pendingTransactionsById);
}
return commitedTransactionsById;
return null;
}
private void resolvePendingTransaction(
SortedMap<Long, List<KeyValue>> pendingTransactionsById,
SortedMap<Long, List<KeyValue>> commitedTransactionsById) {
private SortedMap<Long, List<KeyValue>> resolvePendingTransaction(
SortedMap<Long, List<KeyValue>> pendingTransactionsById
) {
SortedMap<Long, List<KeyValue>> commitedTransactionsById = new TreeMap<Long, List<KeyValue>>();
LOG.info("Region log has " + pendingTransactionsById.size()
+ " unfinished transactions. Going to the transaction log to resolve");
for (Entry<Long, List<KeyValue>> entry : pendingTransactionsById
.entrySet()) {
for (Entry<Long, List<KeyValue>> entry : pendingTransactionsById.entrySet()) {
if (entry.getValue().isEmpty()) {
LOG.debug("Skipping resolving trx ["+entry.getKey()+"] has no writes.");
}
TransactionLogger.TransactionStatus transactionStatus = getGlobalTransactionLog()
.getStatusForTransaction(entry.getKey());
if (transactionStatus == null) {
throw new RuntimeException("Cannot resolve tranasction ["
+ entry.getKey() + "] from global tx log.");
@ -241,12 +248,28 @@ class THLogRecoveryManager {
commitedTransactionsById.put(entry.getKey(), entry.getValue());
break;
case PENDING:
LOG
.warn("Transaction ["
+ entry.getKey()
+ "] is still pending. Asumming it will not commit."
+ " If it eventually does commit, then we loose transactional semantics.");
// TODO this could possibly be handled by waiting and seeing what happens.
break;
}
}
return commitedTransactionsById;
}
private TransactionLogger getGlobalTransactionLog() {
return null;
private TransactionLogger globalTransactionLog = null;
private synchronized TransactionLogger getGlobalTransactionLog() {
if (globalTransactionLog == null) {
try {
globalTransactionLog = new HBaseBackedTransactionLogger();
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return globalTransactionLog;
}
}

View File

@ -20,6 +20,8 @@
package org.apache.hadoop.hbase.regionserver.transactional;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
@ -163,9 +165,10 @@ class TransactionState {
// TODO take deletes into account as well
List<KeyValue> localKVs = new LinkedList<KeyValue>();
for (Put put : puts) {
List<KeyValue> localKVs = new ArrayList<KeyValue>();
List<Put> reversedPuts = new ArrayList<Put>(puts);
Collections.reverse(reversedPuts);
for (Put put : reversedPuts) {
if (!Bytes.equals(get.getRow(), put.getRow())) {
continue;
}

View File

@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.client.Get;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger;
import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException;
import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface;
import org.apache.hadoop.hbase.regionserver.FlushRequester;
@ -131,13 +132,18 @@ public class TransactionalRegion extends HRegion {
throws UnsupportedEncodingException, IOException {
super.doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter);
// We can ignore doing anything with the Trx Log table, it is not-transactional.
if (super.getTableDesc().getNameAsString().equals(HBaseBackedTransactionLogger.TABLE_NAME)) {
return;
}
THLogRecoveryManager recoveryManager = new THLogRecoveryManager(this);
Map<Long, List<KeyValue>> commitedTransactionsById = recoveryManager
.getCommitsFromLog(oldLogFile, minSeqId, reporter);
if (commitedTransactionsById != null && commitedTransactionsById.size() > 0) {
LOG.debug("found " + commitedTransactionsById.size()
+ " COMMITED transactions");
+ " COMMITED transactions to recover.");
for (Entry<Long, List<KeyValue>> entry : commitedTransactionsById
.entrySet()) {
@ -150,12 +156,11 @@ public class TransactionalRegion extends HRegion {
}
}
// LOG.debug("Flushing cache"); // We must trigger a cache flush,
// otherwise
// we will would ignore the log on subsequent failure
// if (!super.flushcache()) {
// LOG.warn("Did not flush cache");
// }
LOG.debug("Flushing cache"); // We must trigger a cache flush,
//otherwise we will would ignore the log on subsequent failure
if (!super.flushcache()) {
LOG.warn("Did not flush cache");
}
}
}
@ -362,6 +367,7 @@ public class TransactionalRegion extends HRegion {
}
// Otherwise we were read-only and commitable, so we can forget it.
state.setStatus(Status.COMMITED);
this.hlog.writeCommitToLog(super.getRegionInfo(), state.getTransactionId());
retireTransaction(state);
return TransactionalRegionInterface.COMMIT_OK_READ_ONLY;
}
@ -459,16 +465,20 @@ public class TransactionalRegion extends HRegion {
LOG.debug("Commiting transaction: " + state.toString() + " to "
+ super.getRegionInfo().getRegionNameAsString());
this.hlog.writeCommitToLog(super.getRegionInfo(), state.getTransactionId());
// FIXME potential mix up here if some deletes should come before the puts.
for (Put update : state.getPuts()) {
this.put(update, false); // Don't need to WAL these
this.put(update, true);
}
for (Delete delete : state.getDeleteSet()) {
this.delete(delete, null, false);
this.delete(delete, null, true);
}
// Now the transaction lives in the WAL, we can writa a commit to the log
// so we don't have to recover it.
this.hlog.writeCommitToLog(super.getRegionInfo(), state.getTransactionId());
state.setStatus(Status.COMMITED);
if (state.hasWrite()
&& !commitPendingTransactions.remove(state)) {

View File

@ -105,23 +105,7 @@ public class TestTHLog extends HBaseTestCase implements
Map<Long, List<KeyValue>> commits = logRecoveryMangaer.getCommitsFromLog(
filename, -1, null);
assertEquals(1, commits.size());
assertTrue(commits.containsKey(transactionId));
assertEquals(3, commits.get(transactionId).size());
List<KeyValue> updates = commits.get(transactionId);
KeyValue update1 = updates.get(0);
assertTrue(Bytes.equals(row1, update1.getRow()));
assertTrue(Bytes.equals(val1, update1.getValue()));
KeyValue update2 = updates.get(1);
assertTrue(Bytes.equals(row2, update2.getRow()));
assertTrue(Bytes.equals(val2, update2.getValue()));
KeyValue update3 = updates.get(2);
assertTrue(Bytes.equals(row3, update3.getRow()));
assertTrue(Bytes.equals(val3, update3.getValue()));
assertNull(commits);
}
@ -153,7 +137,7 @@ public class TestTHLog extends HBaseTestCase implements
Map<Long, List<KeyValue>> commits = logRecoveryMangaer.getCommitsFromLog(
filename, -1, null);
assertEquals(0, commits.size());
assertNull(commits);
}
/**
@ -190,9 +174,7 @@ public class TestTHLog extends HBaseTestCase implements
Map<Long, List<KeyValue>> commits = logMangaer.getCommitsFromLog(filename,
-1, null);
assertEquals(2, commits.size());
assertEquals(2, commits.get(transaction1Id).size());
assertEquals(1, commits.get(transaction2Id).size());
assertNull(commits);
}
/**
@ -229,8 +211,7 @@ public class TestTHLog extends HBaseTestCase implements
Map<Long, List<KeyValue>> commits = logMangaer.getCommitsFromLog(filename,
-1, null);
assertEquals(1, commits.size());
assertEquals(2, commits.get(transaction1Id).size());
assertNull(commits);
}
/**
@ -267,8 +248,7 @@ public class TestTHLog extends HBaseTestCase implements
Map<Long, List<KeyValue>> commits = logMangaer.getCommitsFromLog(filename,
-1, null);
assertEquals(1, commits.size());
assertEquals(1, commits.get(transaction2Id).size());
assertNull(commits);
}
// FIXME Cannot do this test without a global transacton manager

View File

@ -997,6 +997,9 @@ public class KeyValue implements Writable, HeapSize {
* @return True if matching families.
*/
public boolean matchingFamily(final byte [] family) {
if (this.length == 0 || this.bytes.length == 0) {
return false;
}
int o = getFamilyOffset();
int l = getFamilyLength(o);
return Bytes.compareTo(family, 0, family.length, this.bytes, o, l) == 0;

View File

@ -824,6 +824,22 @@ public class HLog implements HConstants, Syncable {
}
}
static Class<? extends HLogKey> getKeyClass(HBaseConfiguration conf) {
return (Class<? extends HLogKey>) conf
.getClass("hbase.regionserver.hlog.keyclass", HLogKey.class);
}
static HLogKey newKey(HBaseConfiguration conf) throws IOException {
Class<? extends HLogKey> keyClass = getKeyClass(conf);
try {
return keyClass.newInstance();
} catch (InstantiationException e) {
throw new IOException("cannot create hlog key");
} catch (IllegalAccessException e) {
throw new IOException("cannot create hlog key");
}
}
/*
* @param rootDir
* @param logfiles
@ -875,7 +891,7 @@ public class HLog implements HConstants, Syncable {
try {
in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
try {
HLogKey key = new HLogKey();
HLogKey key = newKey(conf);
KeyValue val = new KeyValue();
while (in.next(key, val)) {
byte [] regionName = key.getRegionName();
@ -890,7 +906,7 @@ public class HLog implements HConstants, Syncable {
count++;
// Make the key and value new each time; otherwise same instance
// is used over and over.
key = new HLogKey();
key = newKey(conf);
val = new KeyValue();
}
LOG.debug("Pushed=" + count + " entries from " +
@ -960,7 +976,7 @@ public class HLog implements HConstants, Syncable {
}
SequenceFile.Writer w =
SequenceFile.createWriter(fs, conf, logfile,
HLogKey.class, KeyValue.class, getCompressionType(conf));
getKeyClass(conf), KeyValue.class, getCompressionType(conf));
wap = new WriterAndPath(logfile, w);
logWriters.put(key, wap);
if (LOG.isDebugEnabled()) {
@ -970,7 +986,7 @@ public class HLog implements HConstants, Syncable {
if (old != null) {
// Copy from existing log file
HLogKey oldkey = new HLogKey();
HLogKey oldkey = newKey(conf);
KeyValue oldval = new KeyValue();
for (; old.next(oldkey, oldval); count++) {
if (LOG.isDebugEnabled() && count > 0

View File

@ -309,7 +309,7 @@ public class Store implements HConstants, HeapSize {
SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs,
reconstructionLog, this.conf);
try {
HLogKey key = new HLogKey();
HLogKey key = HLog.newKey(conf);
KeyValue val = new KeyValue();
long skippedEdits = 0;
long editsCount = 0;
@ -327,8 +327,7 @@ public class Store implements HConstants, HeapSize {
}
// Check this edit is for me. Also, guard against writing the special
// METACOLUMN info such as HBASE::CACHEFLUSH entries
if (/* commented out for now - stack via jgray key.isTransactionEntry() || */
val.matchingFamily(HLog.METAFAMILY) ||
if (val.matchingFamily(HLog.METAFAMILY) ||
!Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) ||
!val.matchingFamily(family.getName())) {
continue;