From a95bbc96e4b67575a8553ba50b8b7d653e6f6772 Mon Sep 17 00:00:00 2001 From: Andrew Kyle Purtell Date: Sat, 26 Sep 2009 01:26:55 +0000 Subject: [PATCH] HBASE-1848 Master can't split logs written by THBase git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@819085 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 2 + .../HBaseBackedTransactionLogger.java | 1 + .../transactional/TransactionLogger.java | 10 ++- .../transactional/TransactionManager.java | 5 +- .../hbase/client/transactional/package.html | 4 ++ .../transactional/THLogRecoveryManager.java | 67 +++++++++++++------ .../transactional/TransactionState.java | 9 ++- .../transactional/TransactionalRegion.java | 32 ++++++--- .../regionserver/transactional/TestTHLog.java | 30 ++------- .../org/apache/hadoop/hbase/KeyValue.java | 3 + .../hadoop/hbase/regionserver/HLog.java | 24 +++++-- .../hadoop/hbase/regionserver/Store.java | 5 +- 12 files changed, 121 insertions(+), 71 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 5c58052c973..4c3131ef158 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -49,6 +49,8 @@ Release 0.21.0 - Unreleased cacheBlocks HBASE-1869 IndexedTable delete fails when used in conjunction with RowLock() (Keith Thomas via Stack) + HBASE-1858 Master can't split logs created by THBase (Clint Morgan via + Andrew Purtell) IMPROVEMENTS HBASE-1760 Cleanup TODOs in HTable diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/HBaseBackedTransactionLogger.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/HBaseBackedTransactionLogger.java index 17ac025aec2..3bf0fbc50be 100644 --- a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/HBaseBackedTransactionLogger.java +++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/HBaseBackedTransactionLogger.java @@ -24,6 +24,7 @@ import java.util.Random; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HColumnDescriptor; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Get; diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java index c17763c42c2..2d57d21488c 100644 --- a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java +++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionLogger.java @@ -44,16 +44,22 @@ public interface TransactionLogger { */ long createNewTransactionLog(); - /** + /** Get the status of a transaction. * @param transactionId * @return transaction status */ TransactionStatus getStatusForTransaction(long transactionId); - /** + /** Set the status for a transaction. * @param transactionId * @param status */ void setStatusForTransaction(long transactionId, TransactionStatus status); + + /** This transaction's state is no longer needed. + * + * @param transactionId + */ + void forgetTransaction(long transactionId); } diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java index 5bb5517d769..04c5a439a63 100644 --- a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java +++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/TransactionManager.java @@ -152,6 +152,8 @@ public class TransactionManager { if (status == TransactionalRegionInterface.COMMIT_OK) { doCommit(transactionState); + } else if (status == TransactionalRegionInterface.COMMIT_OK_READ_ONLY) { + transactionLogger.forgetTransaction(transactionState.getTransactionId()); } LOG.debug("Committed transaction ["+transactionState.getTransactionId()+"] in ["+((System.currentTimeMillis()-startTime))+"]ms"); } @@ -187,7 +189,7 @@ public class TransactionManager { } throw new CommitUnsuccessfulException(e); } - // TODO: Transaction log can be deleted now ... + transactionLogger.forgetTransaction(transactionState.getTransactionId()); } /** @@ -230,6 +232,7 @@ public class TransactionManager { + "]. Ignoring."); } } + transactionLogger.forgetTransaction(transactionState.getTransactionId()); } public synchronized JtaXAResource getXAResource() { diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/package.html b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/package.html index 312d164452d..3c83fa6ca7a 100644 --- a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/package.html +++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/client/transactional/package.html @@ -43,6 +43,10 @@ TransactionalRegionServer. This is done by setting org.apache.hadoop.hbase.ipc.TransactionalRegionInterface and hbase.regionserver.impl to org.apache.hadoop.hbase.regionserver.transactional.TransactionalRegionServer +Additionally, to properly recover from the write-ahead-log, the transactional log +key class must be registered by setting hbase.regionserver.hlog.keyclass +to org.apache.hadoop.hbase.regionserver.transactional.THLogKey +

The read set claimed by a transactional scanner is determined from the start and diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java index b843b492d11..dfeb029500a 100644 --- a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java +++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/THLogRecoveryManager.java @@ -21,8 +21,8 @@ package org.apache.hadoop.hbase.regionserver.transactional; import java.io.IOException; import java.io.UnsupportedEncodingException; +import java.util.ArrayList; import java.util.HashSet; -import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; @@ -38,6 +38,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger; import org.apache.hadoop.hbase.client.transactional.TransactionLogger; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.io.SequenceFile; @@ -74,6 +75,10 @@ class THLogRecoveryManager { /** + * Go through the WAL, and look for transactions that were started, but never + * completed. If the transaction was committed, then those edits will need to + * be applied. + * * @param reconstructionLog * @param maxSeqID * @param reporter @@ -98,7 +103,7 @@ class THLogRecoveryManager { } SortedMap> pendingTransactionsById = new TreeMap>(); - SortedMap> commitedTransactionsById = new TreeMap>(); + Set commitedTransactions = new HashSet(); Set abortedTransactions = new HashSet(); SequenceFile.Reader logReader = new SequenceFile.Reader(fileSystem, @@ -118,8 +123,10 @@ class THLogRecoveryManager { 2000); while (logReader.next(key, val)) { - LOG.debug("Processing edit: key: " + key.toString() + " val: " - + val.toString()); + if (LOG.isTraceEnabled()) { + LOG.trace("Processing edit: key: " + key.toString() + " val: " + + val.toString()); + } if (key.getLogSeqNum() < maxSeqID) { skippedEdits++; continue; @@ -135,12 +142,12 @@ class THLogRecoveryManager { case START: if (updates != null || abortedTransactions.contains(transactionId) - || commitedTransactionsById.containsKey(transactionId)) { + || commitedTransactions.contains(transactionId)) { LOG.error("Processing start for transaction: " + transactionId + ", but have already seen start message"); throw new IOException("Corrupted transaction log"); } - updates = new LinkedList(); + updates = new ArrayList(); pendingTransactionsById.put(transactionId, updates); startCount++; break; @@ -179,18 +186,13 @@ class THLogRecoveryManager { + ", but also have abort message"); throw new IOException("Corrupted transaction log"); } - if (updates.size() == 0) { - LOG - .warn("Transaciton " + transactionId - + " has no writes in log. "); - } - if (commitedTransactionsById.containsKey(transactionId)) { + if (commitedTransactions.contains(transactionId)) { LOG.error("Processing commit for transaction: " + transactionId + ", but have already commited transaction with that id"); throw new IOException("Corrupted transaction log"); } pendingTransactionsById.remove(transactionId); - commitedTransactionsById.put(transactionId, updates); + commitedTransactions.add(transactionId); commitCount++; break; default: @@ -213,23 +215,28 @@ class THLogRecoveryManager { } if (pendingTransactionsById.size() > 0) { - resolvePendingTransaction(pendingTransactionsById, commitedTransactionsById); + return resolvePendingTransaction(pendingTransactionsById); } - return commitedTransactionsById; + return null; } - private void resolvePendingTransaction( - SortedMap> pendingTransactionsById, - SortedMap> commitedTransactionsById) { + private SortedMap> resolvePendingTransaction( + SortedMap> pendingTransactionsById + ) { + SortedMap> commitedTransactionsById = new TreeMap>(); + LOG.info("Region log has " + pendingTransactionsById.size() + " unfinished transactions. Going to the transaction log to resolve"); - for (Entry> entry : pendingTransactionsById - .entrySet()) { + for (Entry> entry : pendingTransactionsById.entrySet()) { + if (entry.getValue().isEmpty()) { + LOG.debug("Skipping resolving trx ["+entry.getKey()+"] has no writes."); + } TransactionLogger.TransactionStatus transactionStatus = getGlobalTransactionLog() .getStatusForTransaction(entry.getKey()); + if (transactionStatus == null) { throw new RuntimeException("Cannot resolve tranasction [" + entry.getKey() + "] from global tx log."); @@ -241,12 +248,28 @@ class THLogRecoveryManager { commitedTransactionsById.put(entry.getKey(), entry.getValue()); break; case PENDING: + LOG + .warn("Transaction [" + + entry.getKey() + + "] is still pending. Asumming it will not commit." + + " If it eventually does commit, then we loose transactional semantics."); + // TODO this could possibly be handled by waiting and seeing what happens. break; } } + return commitedTransactionsById; } - private TransactionLogger getGlobalTransactionLog() { - return null; + private TransactionLogger globalTransactionLog = null; + + private synchronized TransactionLogger getGlobalTransactionLog() { + if (globalTransactionLog == null) { + try { + globalTransactionLog = new HBaseBackedTransactionLogger(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + return globalTransactionLog; } } diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java index 2d4b732d337..3386de3693f 100644 --- a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java +++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionState.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hbase.regionserver.transactional; import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; import java.util.HashSet; import java.util.Iterator; import java.util.LinkedList; @@ -163,9 +165,10 @@ class TransactionState { // TODO take deletes into account as well - List localKVs = new LinkedList(); - - for (Put put : puts) { + List localKVs = new ArrayList(); + List reversedPuts = new ArrayList(puts); + Collections.reverse(reversedPuts); + for (Put put : reversedPuts) { if (!Bytes.equals(get.getRow(), put.getRow())) { continue; } diff --git a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java index 3f5bc433f16..0a11b124c0a 100644 --- a/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java +++ b/src/contrib/transactional/src/java/org/apache/hadoop/hbase/regionserver/transactional/TransactionalRegion.java @@ -51,6 +51,7 @@ import org.apache.hadoop.hbase.client.Get; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.client.transactional.HBaseBackedTransactionLogger; import org.apache.hadoop.hbase.client.transactional.UnknownTransactionException; import org.apache.hadoop.hbase.ipc.TransactionalRegionInterface; import org.apache.hadoop.hbase.regionserver.FlushRequester; @@ -130,14 +131,19 @@ public class TransactionalRegion extends HRegion { final long minSeqId, final long maxSeqId, final Progressable reporter) throws UnsupportedEncodingException, IOException { super.doReconstructionLog(oldLogFile, minSeqId, maxSeqId, reporter); - + + // We can ignore doing anything with the Trx Log table, it is not-transactional. + if (super.getTableDesc().getNameAsString().equals(HBaseBackedTransactionLogger.TABLE_NAME)) { + return; + } + THLogRecoveryManager recoveryManager = new THLogRecoveryManager(this); Map> commitedTransactionsById = recoveryManager .getCommitsFromLog(oldLogFile, minSeqId, reporter); if (commitedTransactionsById != null && commitedTransactionsById.size() > 0) { LOG.debug("found " + commitedTransactionsById.size() - + " COMMITED transactions"); + + " COMMITED transactions to recover."); for (Entry> entry : commitedTransactionsById .entrySet()) { @@ -150,12 +156,11 @@ public class TransactionalRegion extends HRegion { } } - // LOG.debug("Flushing cache"); // We must trigger a cache flush, - // otherwise - // we will would ignore the log on subsequent failure - // if (!super.flushcache()) { - // LOG.warn("Did not flush cache"); - // } + LOG.debug("Flushing cache"); // We must trigger a cache flush, + //otherwise we will would ignore the log on subsequent failure + if (!super.flushcache()) { + LOG.warn("Did not flush cache"); + } } } @@ -362,6 +367,7 @@ public class TransactionalRegion extends HRegion { } // Otherwise we were read-only and commitable, so we can forget it. state.setStatus(Status.COMMITED); + this.hlog.writeCommitToLog(super.getRegionInfo(), state.getTransactionId()); retireTransaction(state); return TransactionalRegionInterface.COMMIT_OK_READ_ONLY; } @@ -459,15 +465,19 @@ public class TransactionalRegion extends HRegion { LOG.debug("Commiting transaction: " + state.toString() + " to " + super.getRegionInfo().getRegionNameAsString()); - this.hlog.writeCommitToLog(super.getRegionInfo(), state.getTransactionId()); + // FIXME potential mix up here if some deletes should come before the puts. for (Put update : state.getPuts()) { - this.put(update, false); // Don't need to WAL these + this.put(update, true); } for (Delete delete : state.getDeleteSet()) { - this.delete(delete, null, false); + this.delete(delete, null, true); } + + // Now the transaction lives in the WAL, we can writa a commit to the log + // so we don't have to recover it. + this.hlog.writeCommitToLog(super.getRegionInfo(), state.getTransactionId()); state.setStatus(Status.COMMITED); if (state.hasWrite() diff --git a/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java b/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java index 8056385d73b..708a620e720 100644 --- a/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java +++ b/src/contrib/transactional/src/test/org/apache/hadoop/hbase/regionserver/transactional/TestTHLog.java @@ -105,23 +105,7 @@ public class TestTHLog extends HBaseTestCase implements Map> commits = logRecoveryMangaer.getCommitsFromLog( filename, -1, null); - assertEquals(1, commits.size()); - assertTrue(commits.containsKey(transactionId)); - assertEquals(3, commits.get(transactionId).size()); - - List updates = commits.get(transactionId); - - KeyValue update1 = updates.get(0); - assertTrue(Bytes.equals(row1, update1.getRow())); - assertTrue(Bytes.equals(val1, update1.getValue())); - - KeyValue update2 = updates.get(1); - assertTrue(Bytes.equals(row2, update2.getRow())); - assertTrue(Bytes.equals(val2, update2.getValue())); - - KeyValue update3 = updates.get(2); - assertTrue(Bytes.equals(row3, update3.getRow())); - assertTrue(Bytes.equals(val3, update3.getValue())); + assertNull(commits); } @@ -153,7 +137,7 @@ public class TestTHLog extends HBaseTestCase implements Map> commits = logRecoveryMangaer.getCommitsFromLog( filename, -1, null); - assertEquals(0, commits.size()); + assertNull(commits); } /** @@ -190,9 +174,7 @@ public class TestTHLog extends HBaseTestCase implements Map> commits = logMangaer.getCommitsFromLog(filename, -1, null); - assertEquals(2, commits.size()); - assertEquals(2, commits.get(transaction1Id).size()); - assertEquals(1, commits.get(transaction2Id).size()); + assertNull(commits); } /** @@ -229,8 +211,7 @@ public class TestTHLog extends HBaseTestCase implements Map> commits = logMangaer.getCommitsFromLog(filename, -1, null); - assertEquals(1, commits.size()); - assertEquals(2, commits.get(transaction1Id).size()); + assertNull(commits); } /** @@ -267,8 +248,7 @@ public class TestTHLog extends HBaseTestCase implements Map> commits = logMangaer.getCommitsFromLog(filename, -1, null); - assertEquals(1, commits.size()); - assertEquals(1, commits.get(transaction2Id).size()); + assertNull(commits); } // FIXME Cannot do this test without a global transacton manager diff --git a/src/java/org/apache/hadoop/hbase/KeyValue.java b/src/java/org/apache/hadoop/hbase/KeyValue.java index 83683544c75..7a3cf45d5bc 100644 --- a/src/java/org/apache/hadoop/hbase/KeyValue.java +++ b/src/java/org/apache/hadoop/hbase/KeyValue.java @@ -997,6 +997,9 @@ public class KeyValue implements Writable, HeapSize { * @return True if matching families. */ public boolean matchingFamily(final byte [] family) { + if (this.length == 0 || this.bytes.length == 0) { + return false; + } int o = getFamilyOffset(); int l = getFamilyLength(o); return Bytes.compareTo(family, 0, family.length, this.bytes, o, l) == 0; diff --git a/src/java/org/apache/hadoop/hbase/regionserver/HLog.java b/src/java/org/apache/hadoop/hbase/regionserver/HLog.java index 819ba48abff..a1910b939f6 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/HLog.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/HLog.java @@ -823,6 +823,22 @@ public class HLog implements HConstants, Syncable { this.w = w; } } + + static Class getKeyClass(HBaseConfiguration conf) { + return (Class) conf + .getClass("hbase.regionserver.hlog.keyclass", HLogKey.class); + } + + static HLogKey newKey(HBaseConfiguration conf) throws IOException { + Class keyClass = getKeyClass(conf); + try { + return keyClass.newInstance(); + } catch (InstantiationException e) { + throw new IOException("cannot create hlog key"); + } catch (IllegalAccessException e) { + throw new IOException("cannot create hlog key"); + } + } /* * @param rootDir @@ -875,7 +891,7 @@ public class HLog implements HConstants, Syncable { try { in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf); try { - HLogKey key = new HLogKey(); + HLogKey key = newKey(conf); KeyValue val = new KeyValue(); while (in.next(key, val)) { byte [] regionName = key.getRegionName(); @@ -890,7 +906,7 @@ public class HLog implements HConstants, Syncable { count++; // Make the key and value new each time; otherwise same instance // is used over and over. - key = new HLogKey(); + key = newKey(conf); val = new KeyValue(); } LOG.debug("Pushed=" + count + " entries from " + @@ -960,7 +976,7 @@ public class HLog implements HConstants, Syncable { } SequenceFile.Writer w = SequenceFile.createWriter(fs, conf, logfile, - HLogKey.class, KeyValue.class, getCompressionType(conf)); + getKeyClass(conf), KeyValue.class, getCompressionType(conf)); wap = new WriterAndPath(logfile, w); logWriters.put(key, wap); if (LOG.isDebugEnabled()) { @@ -970,7 +986,7 @@ public class HLog implements HConstants, Syncable { if (old != null) { // Copy from existing log file - HLogKey oldkey = new HLogKey(); + HLogKey oldkey = newKey(conf); KeyValue oldval = new KeyValue(); for (; old.next(oldkey, oldval); count++) { if (LOG.isDebugEnabled() && count > 0 diff --git a/src/java/org/apache/hadoop/hbase/regionserver/Store.java b/src/java/org/apache/hadoop/hbase/regionserver/Store.java index c1f4c64032d..b00fdbf35f3 100644 --- a/src/java/org/apache/hadoop/hbase/regionserver/Store.java +++ b/src/java/org/apache/hadoop/hbase/regionserver/Store.java @@ -309,7 +309,7 @@ public class Store implements HConstants, HeapSize { SequenceFile.Reader logReader = new SequenceFile.Reader(this.fs, reconstructionLog, this.conf); try { - HLogKey key = new HLogKey(); + HLogKey key = HLog.newKey(conf); KeyValue val = new KeyValue(); long skippedEdits = 0; long editsCount = 0; @@ -327,8 +327,7 @@ public class Store implements HConstants, HeapSize { } // Check this edit is for me. Also, guard against writing the special // METACOLUMN info such as HBASE::CACHEFLUSH entries - if (/* commented out for now - stack via jgray key.isTransactionEntry() || */ - val.matchingFamily(HLog.METAFAMILY) || + if (val.matchingFamily(HLog.METAFAMILY) || !Bytes.equals(key.getRegionName(), region.regionInfo.getRegionName()) || !val.matchingFamily(family.getName())) { continue;