diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 83127d2aa3e..2ca1eb0794b 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -6239,7 +6239,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi b.getRegionFileSystem().logFileSystemState(LOG); } - RegionMergeTransaction rmt = new RegionMergeTransaction(a, b, true); + RegionMergeTransactionImpl rmt = new RegionMergeTransactionImpl(a, b, true); if (!rmt.prepare(null)) { throw new IOException("Unable to merge regions " + a + " and " + b); } @@ -6252,7 +6252,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi + Bytes.toStringBinary(mergedRegionInfo.getEndKey()) + ">"); HRegion dstRegion; try { - dstRegion = rmt.execute(null, null); + dstRegion = (HRegion)rmt.execute(null, null); } catch (IOException ioe) { rmt.rollback(null, null); throw new IOException("Failed merging region " + a + " and " + b diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java index 441a93bfa7e..ed9550e46b8 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/Region.java @@ -23,9 +23,7 @@ import java.util.List; import java.util.Map; import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.DroppedSnapshotException; import org.apache.hadoop.hbase.HBaseInterfaceAudience; -import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.HTableDescriptor; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java index ffa98cde82c..534d01df36f 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeRequest.java @@ -65,7 +65,7 @@ class RegionMergeRequest implements Runnable { } try { final long startTime = EnvironmentEdgeManager.currentTime(); - RegionMergeTransaction mt = new RegionMergeTransaction(region_a, + RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(region_a, region_b, forcible); //acquire a shared read lock on the table, so that table schema modifications diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java index 17ab887e8a1..72f0e892fe2 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransaction.java @@ -13,35 +13,20 @@ * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitationsME + * License for the specific language governing permissions and limitations * under the License. */ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.util.ArrayList; import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.regionserver.SplitTransaction.TransactionListener; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.HRegionInfo; -import org.apache.hadoop.hbase.MetaMutationAnnotation; -import org.apache.hadoop.hbase.MetaTableAccessor; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; -import org.apache.hadoop.hbase.regionserver.SplitTransaction.LoggingProgressable; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.Pair; /** * Executes region merge as a "transaction". It is similar with @@ -50,12 +35,21 @@ import org.apache.hadoop.hbase.util.Pair; * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if * execute fails. * - *

- * Here is an example of how you would use this class: - * + *

Here is an example of how you would use this interface: *

- *  RegionMergeTransaction mt = new RegionMergeTransaction(this.conf, parent, midKey)
- *  if (!mt.prepare(services)) return;
+ *  RegionMergeTransactionFactory factory = new RegionMergeTransactionFactory(conf);
+ *  RegionMergeTransaction mt = factory.create(parent, midKey)
+ *    .registerTransactionListener(new TransactionListener() {
+ *       public void transition(RegionMergeTransaction transaction,
+ *         RegionMergeTransactionPhase from, RegionMergeTransactionPhase to) throws IOException {
+ *         // ...
+ *       }
+ *       public void rollback(RegionMergeTransaction transaction,
+ *         RegionMergeTransactionPhase from, RegionMergeTransactionPhase to) {
+ *         // ...
+ *       }
+ *    });
+ *  if (!mt.prepare()) return;
  *  try {
  *    mt.execute(server, services);
  *  } catch (IOException ioe) {
@@ -63,33 +57,25 @@ import org.apache.hadoop.hbase.util.Pair;
  *      mt.rollback(server, services);
  *      return;
  *    } catch (RuntimeException e) {
- *      myAbortable.abort("Failed merge, abort");
+ *      // abort the server
  *    }
  *  }
  * 
- *

- * This class is not thread safe. Caller needs ensure merge is run by one thread - * only. + *

A merge transaction is not thread safe. Callers must ensure a split is run by + * one thread only. */ -@InterfaceAudience.Private -public class RegionMergeTransaction { - private static final Log LOG = LogFactory.getLog(RegionMergeTransaction.class); - - // Merged region info - private HRegionInfo mergedRegionInfo; - // region_a sorts before region_b - private final HRegion region_a; - private final HRegion region_b; - // merges dir is under region_a - private final Path mergesdir; - // We only merge adjacent regions if forcible is false - private final boolean forcible; - +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public interface RegionMergeTransaction { /** - * Types to add to the transaction journal. Each enum is a step in the merge - * transaction. Used to figure how much we need to rollback. + * Each enum is a step in the merge transaction. */ - enum JournalEntry { + enum RegionMergeTransactionPhase { + STARTED, + /** + * Prepared + */ + PREPARED, /** * Set region as in transition, set it into MERGING state. */ @@ -122,95 +108,59 @@ public class RegionMergeTransaction { * Point of no return. If we got here, then transaction is not recoverable * other than by crashing out the regionserver. */ - PONR - } - - /* - * Journal of how far the merge transaction has progressed. - */ - private final List journal = new ArrayList(); - - private static IOException closedByOtherException = new IOException( - "Failed to close region: already closed by another thread"); - - private RegionServerCoprocessorHost rsCoprocessorHost = null; - - /** - * Constructor - * @param a region a to merge - * @param b region b to merge - * @param forcible if false, we will only merge adjacent regions - */ - public RegionMergeTransaction(final Region a, final Region b, - final boolean forcible) { - if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) { - this.region_a = (HRegion)a; - this.region_b = (HRegion)b; - } else { - this.region_a = (HRegion)b; - this.region_b = (HRegion)a; - } - this.forcible = forcible; - this.mergesdir = region_a.getRegionFileSystem().getMergesDir(); + PONR, + /** + * Completed + */ + COMPLETED } /** - * Does checks on merge inputs. + * Split transaction journal entry + */ + public interface JournalEntry { + + /** @return the completed phase marked by this journal entry */ + RegionMergeTransactionPhase getPhase(); + + /** @return the time of phase completion */ + long getTimeStamp(); + } + + /** + * Split transaction listener + */ + public interface TransactionListener { + + /** + * Invoked when transitioning forward from one transaction phase to another + * @param transaction the transaction + * @param from the current phase + * @param to the next phase + * @throws IOException listener can throw this to abort + */ + void transition(RegionMergeTransaction transaction, RegionMergeTransactionPhase from, + RegionMergeTransactionPhase to) throws IOException; + + /** + * Invoked when rolling back a transaction from one transaction phase to the + * previous + * @param transaction the transaction + * @param from the current phase + * @param to the previous phase + */ + void rollback(RegionMergeTransaction transaction, RegionMergeTransactionPhase from, + RegionMergeTransactionPhase to); + } + + /** + * Check merge inputs and prepare the transaction. * @param services * @return true if the regions are mergeable else * false if they are not (e.g. its already closed, etc.). + * @throws IOException */ - public boolean prepare(final RegionServerServices services) { - if (!region_a.getTableDesc().getTableName() - .equals(region_b.getTableDesc().getTableName())) { - LOG.info("Can't merge regions " + region_a + "," + region_b - + " because they do not belong to the same table"); - return false; - } - if (region_a.getRegionInfo().equals(region_b.getRegionInfo())) { - LOG.info("Can't merge the same region " + region_a); - return false; - } - if (!forcible && !HRegionInfo.areAdjacent(region_a.getRegionInfo(), - region_b.getRegionInfo())) { - String msg = "Skip merging " + this.region_a.getRegionInfo().getRegionNameAsString() - + " and " + this.region_b.getRegionInfo().getRegionNameAsString() - + ", because they are not adjacent."; - LOG.info(msg); - return false; - } - if (!this.region_a.isMergeable() || !this.region_b.isMergeable()) { - return false; - } - try { - boolean regionAHasMergeQualifier = hasMergeQualifierInMeta(services, - region_a.getRegionInfo().getRegionName()); - if (regionAHasMergeQualifier || - hasMergeQualifierInMeta(services, region_b.getRegionInfo().getRegionName())) { - LOG.debug("Region " + (regionAHasMergeQualifier ? - region_a.getRegionInfo().getRegionNameAsString() : - region_b.getRegionInfo().getRegionNameAsString()) - + " is not mergeable because it has merge qualifier in META"); - return false; - } - } catch (IOException e) { - LOG.warn("Failed judging whether merge transaction is available for " - + region_a.getRegionInfo().getRegionNameAsString() + " and " - + region_b.getRegionInfo().getRegionNameAsString(), e); - return false; - } - - // WARN: make sure there is no parent region of the two merging regions in - // hbase:meta If exists, fixing up daughters would cause daughter regions(we - // have merged one) online again when we restart master, so we should clear - // the parent region to prevent the above case - // Since HBASE-7721, we don't need fix up daughters any more. so here do - // nothing - - this.mergedRegionInfo = getMergedRegionInfo(region_a.getRegionInfo(), - region_b.getRegionInfo()); - return true; - } + boolean prepare(RegionServerServices services) throws IOException; /** * Run the transaction. @@ -222,325 +172,10 @@ public class RegionMergeTransaction { * @throws IOException * @see #rollback(Server, RegionServerServices) */ - public HRegion execute(final Server server, - final RegionServerServices services) throws IOException { - if (rsCoprocessorHost == null) { - rsCoprocessorHost = server != null ? - ((HRegionServer) server).getRegionServerCoprocessorHost() : null; - } - HRegion mergedRegion = createMergedRegion(server, services); - if (rsCoprocessorHost != null) { - rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion); - } - return stepsAfterPONR(server, services, mergedRegion); - } - - public HRegion stepsAfterPONR(final Server server, final RegionServerServices services, - HRegion mergedRegion) throws IOException { - openMergedRegion(server, services, mergedRegion); - if (rsCoprocessorHost != null) { - rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion); - } - return mergedRegion; - } - - /** - * Prepare the merged region and region files. - * @param server Hosting server instance. Can be null when testing - * @param services Used to online/offline regions. - * @return merged region - * @throws IOException If thrown, transaction failed. Call - * {@link #rollback(Server, RegionServerServices)} - */ - HRegion createMergedRegion(final Server server, - final RegionServerServices services) throws IOException { - LOG.info("Starting merge of " + region_a + " and " - + region_b.getRegionInfo().getRegionNameAsString() + ", forcible=" + forcible); - if ((server != null && server.isStopped()) - || (services != null && services.isStopping())) { - throw new IOException("Server is stopped or stopping"); - } - - if (rsCoprocessorHost != null) { - if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) { - throw new IOException("Coprocessor bypassing regions " + this.region_a + " " - + this.region_b + " merge."); - } - } - - // If true, no cluster to write meta edits to or to use coordination. - boolean testing = server == null ? true : server.getConfiguration() - .getBoolean("hbase.testing.nocluster", false); - - HRegion mergedRegion = stepsBeforePONR(server, services, testing); - - @MetaMutationAnnotation - List metaEntries = new ArrayList(); - if (rsCoprocessorHost != null) { - if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) { - throw new IOException("Coprocessor bypassing regions " + this.region_a + " " - + this.region_b + " merge."); - } - try { - for (Mutation p : metaEntries) { - HRegionInfo.parseRegionName(p.getRow()); - } - } catch (IOException e) { - LOG.error("Row key of mutation from coprocessor is not parsable as region name." - + "Mutations from coprocessor should only be for hbase:meta table.", e); - throw e; - } - } - - // This is the point of no return. Similar with SplitTransaction. - // IF we reach the PONR then subsequent failures need to crash out this - // regionserver - this.journal.add(JournalEntry.PONR); - - // Add merged region and delete region_a and region_b - // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region - // will determine whether the region is merged or not in case of failures. - // If it is successful, master will roll-forward, if not, master will - // rollback - if (services != null && !services.reportRegionStateTransition(TransitionCode.MERGE_PONR, - mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { - // Passed PONR, let SSH clean it up - throw new IOException("Failed to notify master that merge passed PONR: " - + region_a.getRegionInfo().getRegionNameAsString() + " and " - + region_b.getRegionInfo().getRegionNameAsString()); - } - return mergedRegion; - } - - public void prepareMutationsForMerge(HRegionInfo mergedRegion, HRegionInfo regionA, - HRegionInfo regionB, ServerName serverName, List mutations) throws IOException { - HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion); - - // Put for parent - Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged); - putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, regionA.toByteArray()); - putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, regionB.toByteArray()); - mutations.add(putOfMerged); - // Deletes for merging regions - Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA); - Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB); - mutations.add(deleteA); - mutations.add(deleteB); - // The merged is a new region, openSeqNum = 1 is fine. - addLocation(putOfMerged, serverName, 1); - } - - public Put addLocation(final Put p, final ServerName sn, long openSeqNum) { - p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes - .toBytes(sn.getHostAndPort())); - p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn - .getStartcode())); - p.add(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum)); - return p; - } - - public HRegion stepsBeforePONR(final Server server, final RegionServerServices services, - boolean testing) throws IOException { - if (services != null && !services.reportRegionStateTransition(TransitionCode.READY_TO_MERGE, - mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { - throw new IOException("Failed to get ok from master to merge " - + region_a.getRegionInfo().getRegionNameAsString() + " and " - + region_b.getRegionInfo().getRegionNameAsString()); - } - this.journal.add(JournalEntry.SET_MERGING); - - this.region_a.getRegionFileSystem().createMergesDir(); - this.journal.add(JournalEntry.CREATED_MERGE_DIR); - - Map> hstoreFilesOfRegionA = closeAndOfflineRegion( - services, this.region_a, true, testing); - Map> hstoreFilesOfRegionB = closeAndOfflineRegion( - services, this.region_b, false, testing); - - assert hstoreFilesOfRegionA != null && hstoreFilesOfRegionB != null; - - - // - // mergeStoreFiles creates merged region dirs under the region_a merges dir - // Nothing to unroll here if failure -- clean up of CREATE_MERGE_DIR will - // clean this up. - mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB); - - // Log to the journal that we are creating merged region. We could fail - // halfway through. If we do, we could have left - // stuff in fs that needs cleanup -- a storefile or two. Thats why we - // add entry to journal BEFORE rather than AFTER the change. - this.journal.add(JournalEntry.STARTED_MERGED_REGION_CREATION); - HRegion mergedRegion = createMergedRegionFromMerges(this.region_a, - this.region_b, this.mergedRegionInfo); - return mergedRegion; - } - - /** - * Create a merged region from the merges directory under region a. In order - * to mock it for tests, place it with a new method. - * @param a hri of region a - * @param b hri of region b - * @param mergedRegion hri of merged region - * @return merged HRegion. - * @throws IOException - */ - HRegion createMergedRegionFromMerges(final HRegion a, final HRegion b, - final HRegionInfo mergedRegion) throws IOException { - return a.createMergedRegionFromMerges(mergedRegion, b); - } - - /** - * Close the merging region and offline it in regionserver - * @param services - * @param region - * @param isRegionA true if it is merging region a, false if it is region b - * @param testing true if it is testing - * @return a map of family name to list of store files - * @throws IOException - */ - private Map> closeAndOfflineRegion( - final RegionServerServices services, final HRegion region, - final boolean isRegionA, final boolean testing) throws IOException { - Map> hstoreFilesToMerge = null; - Exception exceptionToThrow = null; - try { - hstoreFilesToMerge = region.close(false); - } catch (Exception e) { - exceptionToThrow = e; - } - if (exceptionToThrow == null && hstoreFilesToMerge == null) { - // The region was closed by a concurrent thread. We can't continue - // with the merge, instead we must just abandon the merge. If we - // reopen or merge this could cause problems because the region has - // probably already been moved to a different server, or is in the - // process of moving to a different server. - exceptionToThrow = closedByOtherException; - } - if (exceptionToThrow != closedByOtherException) { - this.journal.add(isRegionA ? JournalEntry.CLOSED_REGION_A - : JournalEntry.CLOSED_REGION_B); - } - if (exceptionToThrow != null) { - if (exceptionToThrow instanceof IOException) - throw (IOException) exceptionToThrow; - throw new IOException(exceptionToThrow); - } - - if (!testing) { - services.removeFromOnlineRegions(region, null); - } - this.journal.add(isRegionA ? JournalEntry.OFFLINED_REGION_A - : JournalEntry.OFFLINED_REGION_B); - return hstoreFilesToMerge; - } - - /** - * Get merged region info through the specified two regions - * @param a merging region A - * @param b merging region B - * @return the merged region info - */ - public static HRegionInfo getMergedRegionInfo(final HRegionInfo a, - final HRegionInfo b) { - long rid = EnvironmentEdgeManager.currentTime(); - // Regionid is timestamp. Merged region's id can't be less than that of - // merging regions else will insert at wrong location in hbase:meta - if (rid < a.getRegionId() || rid < b.getRegionId()) { - LOG.warn("Clock skew; merging regions id are " + a.getRegionId() - + " and " + b.getRegionId() + ", but current time here is " + rid); - rid = Math.max(a.getRegionId(), b.getRegionId()) + 1; - } - - byte[] startKey = null; - byte[] endKey = null; - // Choose the smaller as start key - if (a.compareTo(b) <= 0) { - startKey = a.getStartKey(); - } else { - startKey = b.getStartKey(); - } - // Choose the bigger as end key - if (Bytes.equals(a.getEndKey(), HConstants.EMPTY_BYTE_ARRAY) - || (!Bytes.equals(b.getEndKey(), HConstants.EMPTY_BYTE_ARRAY) - && Bytes.compareTo(a.getEndKey(), b.getEndKey()) > 0)) { - endKey = a.getEndKey(); - } else { - endKey = b.getEndKey(); - } - - // Merged region is sorted between two merging regions in META - HRegionInfo mergedRegionInfo = new HRegionInfo(a.getTable(), startKey, - endKey, false, rid); - return mergedRegionInfo; - } - - /** - * Perform time consuming opening of the merged region. - * @param server Hosting server instance. Can be null when testing - * @param services Used to online/offline regions. - * @param merged the merged region - * @throws IOException If thrown, transaction failed. Call - * {@link #rollback(Server, RegionServerServices)} - */ - void openMergedRegion(final Server server, - final RegionServerServices services, HRegion merged) throws IOException { - boolean stopped = server != null && server.isStopped(); - boolean stopping = services != null && services.isStopping(); - if (stopped || stopping) { - LOG.info("Not opening merged region " + merged.getRegionInfo().getRegionNameAsString() - + " because stopping=" + stopping + ", stopped=" + stopped); - return; - } - HRegionInfo hri = merged.getRegionInfo(); - LoggingProgressable reporter = server == null ? null - : new LoggingProgressable(hri, server.getConfiguration().getLong( - "hbase.regionserver.regionmerge.open.log.interval", 10000)); - merged.openHRegion(reporter); - - if (services != null) { - if (!services.reportRegionStateTransition(TransitionCode.MERGED, - mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { - throw new IOException("Failed to report merged region to master: " - + mergedRegionInfo.getShortNameToLog()); - } - services.addToOnlineRegions(merged); - } - } - - /** - * Create reference file(s) of merging regions under the region_a merges dir - * @param hstoreFilesOfRegionA - * @param hstoreFilesOfRegionB - * @throws IOException - */ - private void mergeStoreFiles( - Map> hstoreFilesOfRegionA, - Map> hstoreFilesOfRegionB) - throws IOException { - // Create reference file(s) of region A in mergdir - HRegionFileSystem fs_a = this.region_a.getRegionFileSystem(); - for (Map.Entry> entry : hstoreFilesOfRegionA - .entrySet()) { - String familyName = Bytes.toString(entry.getKey()); - for (StoreFile storeFile : entry.getValue()) { - fs_a.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile, - this.mergesdir); - } - } - // Create reference file(s) of region B in mergedir - HRegionFileSystem fs_b = this.region_b.getRegionFileSystem(); - for (Map.Entry> entry : hstoreFilesOfRegionB - .entrySet()) { - String familyName = Bytes.toString(entry.getKey()); - for (StoreFile storeFile : entry.getValue()) { - fs_b.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile, - this.mergesdir); - } - } - } + Region execute(Server server, RegionServerServices services) throws IOException; /** + * Roll back a failed transaction * @param server Hosting server instance (May be null when testing). * @param services Services of regionserver, used to online regions. * @throws IOException If thrown, rollback failed. Take drastic action. @@ -548,123 +183,37 @@ public class RegionMergeTransaction { * of no return and so now need to abort the server to minimize * damage. */ - @SuppressWarnings("deprecation") - public boolean rollback(final Server server, - final RegionServerServices services) throws IOException { - assert this.mergedRegionInfo != null; - // Coprocessor callback - if (rsCoprocessorHost != null) { - rsCoprocessorHost.preRollBackMerge(this.region_a, this.region_b); - } - - boolean result = true; - ListIterator iterator = this.journal - .listIterator(this.journal.size()); - // Iterate in reverse. - while (iterator.hasPrevious()) { - JournalEntry je = iterator.previous(); - switch (je) { - - case SET_MERGING: - if (services != null - && !services.reportRegionStateTransition(TransitionCode.MERGE_REVERTED, - mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { - return false; - } - break; - - case CREATED_MERGE_DIR: - this.region_a.writestate.writesEnabled = true; - this.region_b.writestate.writesEnabled = true; - this.region_a.getRegionFileSystem().cleanupMergesDir(); - break; - - case CLOSED_REGION_A: - try { - // So, this returns a seqid but if we just closed and then reopened, - // we should be ok. On close, we flushed using sequenceid obtained - // from hosting regionserver so no need to propagate the sequenceid - // returned out of initialize below up into regionserver as we - // normally do. - this.region_a.initialize(); - } catch (IOException e) { - LOG.error("Failed rollbacking CLOSED_REGION_A of region " - + this.region_a.getRegionInfo().getRegionNameAsString(), e); - throw new RuntimeException(e); - } - break; - - case OFFLINED_REGION_A: - if (services != null) - services.addToOnlineRegions(this.region_a); - break; - - case CLOSED_REGION_B: - try { - this.region_b.initialize(); - } catch (IOException e) { - LOG.error("Failed rollbacking CLOSED_REGION_A of region " - + this.region_b.getRegionInfo().getRegionNameAsString(), e); - throw new RuntimeException(e); - } - break; - - case OFFLINED_REGION_B: - if (services != null) - services.addToOnlineRegions(this.region_b); - break; - - case STARTED_MERGED_REGION_CREATION: - this.region_a.getRegionFileSystem().cleanupMergedRegion( - this.mergedRegionInfo); - break; - - case PONR: - // We got to the point-of-no-return so we need to just abort. Return - // immediately. Do not clean up created merged regions. - return false; - - default: - throw new RuntimeException("Unhandled journal entry: " + je); - } - } - // Coprocessor callback - if (rsCoprocessorHost != null) { - rsCoprocessorHost.postRollBackMerge(this.region_a, this.region_b); - } - - return result; - } - - HRegionInfo getMergedRegionInfo() { - return this.mergedRegionInfo; - } - - // For unit testing. - Path getMergesDir() { - return this.mergesdir; - } + boolean rollback(Server server, RegionServerServices services) throws IOException; /** - * Checks if the given region has merge qualifier in hbase:meta - * @param services - * @param regionName name of specified region - * @return true if the given region has merge qualifier in META.(It will be - * cleaned by CatalogJanitor) - * @throws IOException + * Register a listener for transaction preparation, execution, and possibly + * rollback phases. + *

A listener can abort a transaction by throwing an exception. + * @param listener the listener + * @return 'this' for chaining */ - boolean hasMergeQualifierInMeta(final RegionServerServices services, - final byte[] regionName) throws IOException { - if (services == null) return false; - // Get merge regions if it is a merged region and already has merge - // qualifier - Pair mergeRegions = MetaTableAccessor - .getRegionsFromMergeQualifier(services.getConnection(), regionName); - if (mergeRegions != null && - (mergeRegions.getFirst() != null || mergeRegions.getSecond() != null)) { - // It has merge qualifier - return true; - } - return false; - } + RegionMergeTransaction registerTransactionListener(TransactionListener listener); + + /** @return merged region info */ + HRegionInfo getMergedRegionInfo(); + + /** + * Get the journal for the transaction. + *

Journal entries are an opaque type represented as JournalEntry. They can + * also provide useful debugging information via their toString method. + * @return the transaction journal + */ + List getJournal(); + + /** + * Get the Server running the transaction or rollback + * @return server instance + */ + Server getServer(); + + /** + * Get the RegonServerServices of the server running the transaction or rollback + * @return region server services + */ + RegionServerServices getRegionServerServices(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionFactory.java new file mode 100644 index 00000000000..c844d547296 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionFactory.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * A factory for creating RegionMergeTransactions, which execute region split as a "transaction". + * See {@link RegionMergeTransactionImpl} + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public class RegionMergeTransactionFactory implements Configurable { + + public static final String MERGE_TRANSACTION_IMPL_KEY = + "hbase.regionserver.merge.transaction.impl"; + + private Configuration conf; + + public RegionMergeTransactionFactory(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Create a merge transaction + * @param a region a to merge + * @param b region b to merge + * @param forcible if false, we will only merge adjacent regions + * @return transaction instance + */ + public RegionMergeTransactionImpl create(final Region a, final Region b, + final boolean forcible) { + // The implementation class must extend RegionMergeTransactionImpl, not only + // implement the RegionMergeTransaction interface like you might expect, + // because various places such as AssignmentManager use static methods + // from RegionMergeTransactionImpl. Whatever we use for implementation must + // be compatible, so it's safest to require ? extends RegionMergeTransactionImpl. + // If not compatible we will throw a runtime exception from here. + return ReflectionUtils.instantiateWithCustomCtor( + conf.getClass(MERGE_TRANSACTION_IMPL_KEY, RegionMergeTransactionImpl.class, + RegionMergeTransactionImpl.class).getName(), + new Class[] { Region.class, Region.class, boolean.class }, + new Object[] { a, b, forcible }); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java new file mode 100644 index 00000000000..bf695343958 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java @@ -0,0 +1,702 @@ +/** + * Copyright The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with this + * work for additional information regarding copyright ownership. The ASF + * licenses this file to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitationsME + * under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.MetaMutationAnnotation; +import org.apache.hadoop.hbase.MetaTableAccessor; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl.LoggingProgressable; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Pair; + +import com.google.common.annotations.VisibleForTesting; + +@InterfaceAudience.Private +public class RegionMergeTransactionImpl implements RegionMergeTransaction { + private static final Log LOG = LogFactory.getLog(RegionMergeTransactionImpl.class); + + // Merged region info + private HRegionInfo mergedRegionInfo; + // region_a sorts before region_b + private final HRegion region_a; + private final HRegion region_b; + // merges dir is under region_a + private final Path mergesdir; + // We only merge adjacent regions if forcible is false + private final boolean forcible; + + /* + * Transaction state for listener, only valid during execute and + * rollback + */ + private RegionMergeTransactionPhase currentPhase = RegionMergeTransactionPhase.STARTED; + private Server server; + private RegionServerServices rsServices; + + public static class JournalEntryImpl implements JournalEntry { + private RegionMergeTransactionPhase type; + private long timestamp; + + public JournalEntryImpl(RegionMergeTransactionPhase type) { + this(type, EnvironmentEdgeManager.currentTime()); + } + + public JournalEntryImpl(RegionMergeTransactionPhase type, long timestamp) { + this.type = type; + this.timestamp = timestamp; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(type); + sb.append(" at "); + sb.append(timestamp); + return sb.toString(); + } + + @Override + public RegionMergeTransactionPhase getPhase() { + return type; + } + + @Override + public long getTimeStamp() { + return timestamp; + } + } + + /* + * Journal of how far the merge transaction has progressed. + */ + private final List journal = new ArrayList(); + + /** + * Listeners + */ + private final ArrayList listeners = new ArrayList(); + + private static IOException closedByOtherException = new IOException( + "Failed to close region: already closed by another thread"); + + private RegionServerCoprocessorHost rsCoprocessorHost = null; + + /** + * Constructor + * @param a region a to merge + * @param b region b to merge + * @param forcible if false, we will only merge adjacent regions + */ + public RegionMergeTransactionImpl(final HRegion a, final HRegion b, + final boolean forcible) { + if (a.getRegionInfo().compareTo(b.getRegionInfo()) <= 0) { + this.region_a = a; + this.region_b = b; + } else { + this.region_a = b; + this.region_b = a; + } + this.forcible = forcible; + this.mergesdir = region_a.getRegionFileSystem().getMergesDir(); + } + + private void transition(RegionMergeTransactionPhase nextPhase) throws IOException { + transition(nextPhase, false); + } + + private void transition(RegionMergeTransactionPhase nextPhase, boolean isRollback) + throws IOException { + if (!isRollback) { + // Add to the journal first, because if the listener throws an exception + // we need to roll back starting at 'nextPhase' + this.journal.add(new JournalEntryImpl(nextPhase)); + } + for (int i = 0; i < listeners.size(); i++) { + TransactionListener listener = listeners.get(i); + if (!isRollback) { + listener.transition(this, currentPhase, nextPhase); + } else { + listener.rollback(this, currentPhase, nextPhase); + } + } + currentPhase = nextPhase; + } + + @Override + public boolean prepare(final RegionServerServices services) throws IOException { + if (!region_a.getTableDesc().getTableName() + .equals(region_b.getTableDesc().getTableName())) { + LOG.info("Can't merge regions " + region_a + "," + region_b + + " because they do not belong to the same table"); + return false; + } + if (region_a.getRegionInfo().equals(region_b.getRegionInfo())) { + LOG.info("Can't merge the same region " + region_a); + return false; + } + if (!forcible && !HRegionInfo.areAdjacent(region_a.getRegionInfo(), + region_b.getRegionInfo())) { + String msg = "Skip merging " + region_a.getRegionInfo().getRegionNameAsString() + + " and " + region_b.getRegionInfo().getRegionNameAsString() + + ", because they are not adjacent."; + LOG.info(msg); + return false; + } + if (!this.region_a.isMergeable() || !this.region_b.isMergeable()) { + return false; + } + try { + boolean regionAHasMergeQualifier = hasMergeQualifierInMeta(services, + region_a.getRegionInfo().getRegionName()); + if (regionAHasMergeQualifier || + hasMergeQualifierInMeta(services, region_b.getRegionInfo().getRegionName())) { + LOG.debug("Region " + (regionAHasMergeQualifier ? + region_a.getRegionInfo().getRegionNameAsString() + : region_b.getRegionInfo().getRegionNameAsString()) + + " is not mergeable because it has merge qualifier in META"); + return false; + } + } catch (IOException e) { + LOG.warn("Failed judging whether merge transaction is available for " + + region_a.getRegionInfo().getRegionNameAsString() + " and " + + region_b.getRegionInfo().getRegionNameAsString(), e); + return false; + } + + // WARN: make sure there is no parent region of the two merging regions in + // hbase:meta If exists, fixing up daughters would cause daughter regions(we + // have merged one) online again when we restart master, so we should clear + // the parent region to prevent the above case + // Since HBASE-7721, we don't need fix up daughters any more. so here do + // nothing + + this.mergedRegionInfo = getMergedRegionInfo(region_a.getRegionInfo(), + region_b.getRegionInfo()); + + transition(RegionMergeTransactionPhase.PREPARED); + return true; + } + + @Override + public Region execute(final Server server, final RegionServerServices services) + throws IOException { + this.server = server; + this.rsServices = services; + if (rsCoprocessorHost == null) { + rsCoprocessorHost = server != null ? + ((HRegionServer) server).getRegionServerCoprocessorHost() : null; + } + HRegion mergedRegion = createMergedRegion(server, services); + if (rsCoprocessorHost != null) { + rsCoprocessorHost.postMergeCommit(this.region_a, this.region_b, mergedRegion); + } + stepsAfterPONR(server, services, mergedRegion); + + transition(RegionMergeTransactionPhase.COMPLETED); + + return mergedRegion; + } + + @VisibleForTesting + public void stepsAfterPONR(final Server server, final RegionServerServices services, + HRegion mergedRegion) throws IOException { + openMergedRegion(server, services, mergedRegion); + if (rsCoprocessorHost != null) { + rsCoprocessorHost.postMerge(this.region_a, this.region_b, mergedRegion); + } + } + + /** + * Prepare the merged region and region files. + * @param server Hosting server instance. Can be null when testing + * @param services Used to online/offline regions. + * @return merged region + * @throws IOException If thrown, transaction failed. Call + * {@link #rollback(Server, RegionServerServices)} + */ + private HRegion createMergedRegion(final Server server, final RegionServerServices services) + throws IOException { + LOG.info("Starting merge of " + region_a + " and " + + region_b.getRegionInfo().getRegionNameAsString() + ", forcible=" + forcible); + if ((server != null && server.isStopped()) + || (services != null && services.isStopping())) { + throw new IOException("Server is stopped or stopping"); + } + + if (rsCoprocessorHost != null) { + if (rsCoprocessorHost.preMerge(this.region_a, this.region_b)) { + throw new IOException("Coprocessor bypassing regions " + this.region_a + " " + + this.region_b + " merge."); + } + } + + // If true, no cluster to write meta edits to or to use coordination. + boolean testing = server == null ? true : server.getConfiguration() + .getBoolean("hbase.testing.nocluster", false); + + HRegion mergedRegion = stepsBeforePONR(server, services, testing); + + @MetaMutationAnnotation + List metaEntries = new ArrayList(); + if (rsCoprocessorHost != null) { + if (rsCoprocessorHost.preMergeCommit(this.region_a, this.region_b, metaEntries)) { + throw new IOException("Coprocessor bypassing regions " + this.region_a + " " + + this.region_b + " merge."); + } + try { + for (Mutation p : metaEntries) { + HRegionInfo.parseRegionName(p.getRow()); + } + } catch (IOException e) { + LOG.error("Row key of mutation from coprocessor is not parsable as region name." + + "Mutations from coprocessor should only be for hbase:meta table.", e); + throw e; + } + } + + // This is the point of no return. Similar with SplitTransaction. + // IF we reach the PONR then subsequent failures need to crash out this + // regionserver + transition(RegionMergeTransactionPhase.PONR); + + // Add merged region and delete region_a and region_b + // as an atomic update. See HBASE-7721. This update to hbase:meta makes the region + // will determine whether the region is merged or not in case of failures. + // If it is successful, master will roll-forward, if not, master will + // rollback + if (services != null && !services.reportRegionStateTransition(TransitionCode.MERGE_PONR, + mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { + // Passed PONR, let SSH clean it up + throw new IOException("Failed to notify master that merge passed PONR: " + + region_a.getRegionInfo().getRegionNameAsString() + " and " + + region_b.getRegionInfo().getRegionNameAsString()); + } + return mergedRegion; + } + + @VisibleForTesting + public void prepareMutationsForMerge(HRegionInfo mergedRegion, HRegionInfo regionA, + HRegionInfo regionB, ServerName serverName, List mutations) throws IOException { + HRegionInfo copyOfMerged = new HRegionInfo(mergedRegion); + + // Put for parent + Put putOfMerged = MetaTableAccessor.makePutFromRegionInfo(copyOfMerged); + putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEA_QUALIFIER, + regionA.toByteArray()); + putOfMerged.add(HConstants.CATALOG_FAMILY, HConstants.MERGEB_QUALIFIER, + regionB.toByteArray()); + mutations.add(putOfMerged); + // Deletes for merging regions + Delete deleteA = MetaTableAccessor.makeDeleteFromRegionInfo(regionA); + Delete deleteB = MetaTableAccessor.makeDeleteFromRegionInfo(regionB); + mutations.add(deleteA); + mutations.add(deleteB); + // The merged is a new region, openSeqNum = 1 is fine. + addLocation(putOfMerged, serverName, 1); + } + + @VisibleForTesting + Put addLocation(final Put p, final ServerName sn, long openSeqNum) { + p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes + .toBytes(sn.getHostAndPort())); + p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn + .getStartcode())); + p.add(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum)); + return p; + } + + @VisibleForTesting + public HRegion stepsBeforePONR(final Server server, final RegionServerServices services, + boolean testing) throws IOException { + if (services != null && !services.reportRegionStateTransition(TransitionCode.READY_TO_MERGE, + mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { + throw new IOException("Failed to get ok from master to merge " + + region_a.getRegionInfo().getRegionNameAsString() + " and " + + region_b.getRegionInfo().getRegionNameAsString()); + } + + transition(RegionMergeTransactionPhase.SET_MERGING); + + this.region_a.getRegionFileSystem().createMergesDir(); + + transition(RegionMergeTransactionPhase.CREATED_MERGE_DIR); + + Map> hstoreFilesOfRegionA = closeAndOfflineRegion( + services, this.region_a, true, testing); + Map> hstoreFilesOfRegionB = closeAndOfflineRegion( + services, this.region_b, false, testing); + + assert hstoreFilesOfRegionA != null && hstoreFilesOfRegionB != null; + + // mergeStoreFiles creates merged region dirs under the region_a merges dir + // Nothing to unroll here if failure -- clean up of CREATE_MERGE_DIR will + // clean this up. + mergeStoreFiles(hstoreFilesOfRegionA, hstoreFilesOfRegionB); + + // Log to the journal that we are creating merged region. We could fail + // halfway through. If we do, we could have left + // stuff in fs that needs cleanup -- a storefile or two. Thats why we + // add entry to journal BEFORE rather than AFTER the change. + + transition(RegionMergeTransactionPhase.STARTED_MERGED_REGION_CREATION); + + HRegion mergedRegion = createMergedRegionFromMerges(this.region_a, + this.region_b, this.mergedRegionInfo); + return mergedRegion; + } + + /** + * Create a merged region from the merges directory under region a. In order + * to mock it for tests, place it with a new method. + * @param a hri of region a + * @param b hri of region b + * @param mergedRegion hri of merged region + * @return merged HRegion. + * @throws IOException + */ + @VisibleForTesting + HRegion createMergedRegionFromMerges(final HRegion a, final HRegion b, + final HRegionInfo mergedRegion) throws IOException { + return a.createMergedRegionFromMerges(mergedRegion, b); + } + + /** + * Close the merging region and offline it in regionserver + * @param services + * @param region + * @param isRegionA true if it is merging region a, false if it is region b + * @param testing true if it is testing + * @return a map of family name to list of store files + * @throws IOException + */ + private Map> closeAndOfflineRegion( + final RegionServerServices services, final HRegion region, + final boolean isRegionA, final boolean testing) throws IOException { + Map> hstoreFilesToMerge = null; + Exception exceptionToThrow = null; + try { + hstoreFilesToMerge = region.close(false); + } catch (Exception e) { + exceptionToThrow = e; + } + if (exceptionToThrow == null && hstoreFilesToMerge == null) { + // The region was closed by a concurrent thread. We can't continue + // with the merge, instead we must just abandon the merge. If we + // reopen or merge this could cause problems because the region has + // probably already been moved to a different server, or is in the + // process of moving to a different server. + exceptionToThrow = closedByOtherException; + } + if (exceptionToThrow != closedByOtherException) { + transition(isRegionA ? RegionMergeTransactionPhase.CLOSED_REGION_A + : RegionMergeTransactionPhase.CLOSED_REGION_B); + } + if (exceptionToThrow != null) { + if (exceptionToThrow instanceof IOException) + throw (IOException) exceptionToThrow; + throw new IOException(exceptionToThrow); + } + if (!testing) { + services.removeFromOnlineRegions(region, null); + } + + transition(isRegionA ? RegionMergeTransactionPhase.OFFLINED_REGION_A + : RegionMergeTransactionPhase.OFFLINED_REGION_B); + + return hstoreFilesToMerge; + } + + /** + * Get merged region info through the specified two regions + * @param a merging region A + * @param b merging region B + * @return the merged region info + */ + @VisibleForTesting + static HRegionInfo getMergedRegionInfo(final HRegionInfo a, final HRegionInfo b) { + long rid = EnvironmentEdgeManager.currentTime(); + // Regionid is timestamp. Merged region's id can't be less than that of + // merging regions else will insert at wrong location in hbase:meta + if (rid < a.getRegionId() || rid < b.getRegionId()) { + LOG.warn("Clock skew; merging regions id are " + a.getRegionId() + + " and " + b.getRegionId() + ", but current time here is " + rid); + rid = Math.max(a.getRegionId(), b.getRegionId()) + 1; + } + + byte[] startKey = null; + byte[] endKey = null; + // Choose the smaller as start key + if (a.compareTo(b) <= 0) { + startKey = a.getStartKey(); + } else { + startKey = b.getStartKey(); + } + // Choose the bigger as end key + if (Bytes.equals(a.getEndKey(), HConstants.EMPTY_BYTE_ARRAY) + || (!Bytes.equals(b.getEndKey(), HConstants.EMPTY_BYTE_ARRAY) + && Bytes.compareTo(a.getEndKey(), b.getEndKey()) > 0)) { + endKey = a.getEndKey(); + } else { + endKey = b.getEndKey(); + } + + // Merged region is sorted between two merging regions in META + HRegionInfo mergedRegionInfo = new HRegionInfo(a.getTable(), startKey, + endKey, false, rid); + return mergedRegionInfo; + } + + /** + * Perform time consuming opening of the merged region. + * @param server Hosting server instance. Can be null when testing + * @param services Used to online/offline regions. + * @param merged the merged region + * @throws IOException If thrown, transaction failed. Call + * {@link #rollback(Server, RegionServerServices)} + */ + @VisibleForTesting + void openMergedRegion(final Server server, final RegionServerServices services, + HRegion merged) throws IOException { + boolean stopped = server != null && server.isStopped(); + boolean stopping = services != null && services.isStopping(); + if (stopped || stopping) { + LOG.info("Not opening merged region " + merged.getRegionInfo().getRegionNameAsString() + + " because stopping=" + stopping + ", stopped=" + stopped); + return; + } + HRegionInfo hri = merged.getRegionInfo(); + LoggingProgressable reporter = server == null ? null + : new LoggingProgressable(hri, server.getConfiguration().getLong( + "hbase.regionserver.regionmerge.open.log.interval", 10000)); + merged.openHRegion(reporter); + + if (services != null) { + if (!services.reportRegionStateTransition(TransitionCode.MERGED, + mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { + throw new IOException("Failed to report merged region to master: " + + mergedRegionInfo.getShortNameToLog()); + } + services.addToOnlineRegions(merged); + } + } + + /** + * Create reference file(s) of merging regions under the region_a merges dir + * @param hstoreFilesOfRegionA + * @param hstoreFilesOfRegionB + * @throws IOException + */ + private void mergeStoreFiles( + Map> hstoreFilesOfRegionA, + Map> hstoreFilesOfRegionB) + throws IOException { + // Create reference file(s) of region A in mergdir + HRegionFileSystem fs_a = this.region_a.getRegionFileSystem(); + for (Map.Entry> entry : hstoreFilesOfRegionA + .entrySet()) { + String familyName = Bytes.toString(entry.getKey()); + for (StoreFile storeFile : entry.getValue()) { + fs_a.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile, + this.mergesdir); + } + } + // Create reference file(s) of region B in mergedir + HRegionFileSystem fs_b = this.region_b.getRegionFileSystem(); + for (Map.Entry> entry : hstoreFilesOfRegionB + .entrySet()) { + String familyName = Bytes.toString(entry.getKey()); + for (StoreFile storeFile : entry.getValue()) { + fs_b.mergeStoreFile(this.mergedRegionInfo, familyName, storeFile, + this.mergesdir); + } + } + } + + @Override + public boolean rollback(final Server server, + final RegionServerServices services) throws IOException { + assert this.mergedRegionInfo != null; + this.server = server; + this.rsServices = services; + // Coprocessor callback + if (rsCoprocessorHost != null) { + rsCoprocessorHost.preRollBackMerge(this.region_a, this.region_b); + } + + boolean result = true; + ListIterator iterator = this.journal + .listIterator(this.journal.size()); + // Iterate in reverse. + while (iterator.hasPrevious()) { + JournalEntry je = iterator.previous(); + + transition(je.getPhase(), true); + + switch (je.getPhase()) { + + case SET_MERGING: + if (services != null + && !services.reportRegionStateTransition(TransitionCode.MERGE_REVERTED, + mergedRegionInfo, region_a.getRegionInfo(), region_b.getRegionInfo())) { + return false; + } + break; + + case CREATED_MERGE_DIR: + this.region_a.writestate.writesEnabled = true; + this.region_b.writestate.writesEnabled = true; + this.region_a.getRegionFileSystem().cleanupMergesDir(); + break; + + case CLOSED_REGION_A: + try { + // So, this returns a seqid but if we just closed and then reopened, + // we should be ok. On close, we flushed using sequenceid obtained + // from hosting regionserver so no need to propagate the sequenceid + // returned out of initialize below up into regionserver as we + // normally do. + this.region_a.initialize(); + } catch (IOException e) { + LOG.error("Failed rollbacking CLOSED_REGION_A of region " + + region_a.getRegionInfo().getRegionNameAsString(), e); + throw new RuntimeException(e); + } + break; + + case OFFLINED_REGION_A: + if (services != null) + services.addToOnlineRegions(this.region_a); + break; + + case CLOSED_REGION_B: + try { + this.region_b.initialize(); + } catch (IOException e) { + LOG.error("Failed rollbacking CLOSED_REGION_A of region " + + region_b.getRegionInfo().getRegionNameAsString(), e); + throw new RuntimeException(e); + } + break; + + case OFFLINED_REGION_B: + if (services != null) + services.addToOnlineRegions(this.region_b); + break; + + case STARTED_MERGED_REGION_CREATION: + this.region_a.getRegionFileSystem().cleanupMergedRegion( + this.mergedRegionInfo); + break; + + case PONR: + // We got to the point-of-no-return so we need to just abort. Return + // immediately. Do not clean up created merged regions. + return false; + + // Informational states only + case STARTED: + case PREPARED: + case COMPLETED: + break; + + default: + throw new RuntimeException("Unhandled journal entry: " + je); + } + } + // Coprocessor callback + if (rsCoprocessorHost != null) { + rsCoprocessorHost.postRollBackMerge(this.region_a, this.region_b); + } + + return result; + } + + @Override + public HRegionInfo getMergedRegionInfo() { + return this.mergedRegionInfo; + } + + @VisibleForTesting + Path getMergesDir() { + return this.mergesdir; + } + + /** + * Checks if the given region has merge qualifier in hbase:meta + * @param services + * @param regionName name of specified region + * @return true if the given region has merge qualifier in META.(It will be + * cleaned by CatalogJanitor) + * @throws IOException + */ + @VisibleForTesting + boolean hasMergeQualifierInMeta(final RegionServerServices services, final byte[] regionName) + throws IOException { + if (services == null) return false; + // Get merge regions if it is a merged region and already has merge + // qualifier + Pair mergeRegions = MetaTableAccessor + .getRegionsFromMergeQualifier(services.getConnection(), regionName); + if (mergeRegions != null && + (mergeRegions.getFirst() != null || mergeRegions.getSecond() != null)) { + // It has merge qualifier + return true; + } + return false; + } + + @Override + public List getJournal() { + return journal; + } + + @Override + public RegionMergeTransaction registerTransactionListener(TransactionListener listener) { + listeners.add(listener); + return this; + } + + @Override + public Server getServer() { + return server; + } + + @Override + public RegionServerServices getRegionServerServices() { + return rsServices; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java.rej b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java.rej new file mode 100644 index 00000000000..45e9534c47a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java.rej @@ -0,0 +1,10 @@ +--- hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java ++++ hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RegionMergeTransactionImpl.java +@@ -41,6 +41,7 @@ + import org.apache.hadoop.hbase.coordination.BaseCoordinatedStateManager; + import org.apache.hadoop.hbase.coordination.RegionMergeCoordination.RegionMergeDetails; + import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; ++import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl.JournalEntryImpl; + import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl.LoggingProgressable; + import org.apache.hadoop.hbase.util.Bytes; + import org.apache.hadoop.hbase.util.ConfigUtil; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java index 9034a72a91f..b1600c03130 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitRequest.java @@ -64,7 +64,7 @@ class SplitRequest implements Runnable { boolean success = false; server.metricsRegionServer.incrSplitRequest(); long startTime = EnvironmentEdgeManager.currentTime(); - SplitTransaction st = new SplitTransaction(parent, midKey); + SplitTransactionImpl st = new SplitTransactionImpl(parent, midKey); try { //acquire a shared read lock on the table, so that table schema modifications //do not happen concurrently diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java index bbee93c5bfe..a21c19d7b0e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransaction.java @@ -1,5 +1,4 @@ -/** - * +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -19,93 +18,62 @@ package org.apache.hadoop.hbase.regionserver; import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; import java.util.List; -import java.util.ListIterator; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executors; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.classification.InterfaceAudience; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; import org.apache.hadoop.hbase.Server; -import org.apache.hadoop.hbase.ServerName; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.HasThread; -import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; import org.apache.hadoop.hbase.util.PairOfSameType; -import org.apache.zookeeper.KeeperException; - -import com.google.common.util.concurrent.ThreadFactoryBuilder; /** * Executes region split as a "transaction". Call {@link #prepare()} to setup * the transaction, {@link #execute(Server, RegionServerServices)} to run the * transaction and {@link #rollback(Server, RegionServerServices)} to cleanup if execute fails. * - *

Here is an example of how you would use this class: + *

Here is an example of how you would use this interface: *

- *  SplitTransaction st = new SplitTransaction(this.conf, parent, midKey)
+ *  SplitTransactionFactory factory = new SplitTransactionFactory(conf);
+ *  SplitTransaction st = factory.create(parent, midKey)
+ *    .registerTransactionListener(new TransactionListener() {
+ *       public void transition(SplitTransaction transaction, SplitTransactionPhase from,
+ *           SplitTransactionPhase to) throws IOException {
+ *         // ...
+ *       }
+ *       public void rollback(SplitTransaction transaction, SplitTransactionPhase from,
+ *           SplitTransactionPhase to) {
+ *         // ...
+ *       }
+ *    });
  *  if (!st.prepare()) return;
  *  try {
  *    st.execute(server, services);
- *  } catch (IOException ioe) {
+ *  } catch (IOException e) {
  *    try {
  *      st.rollback(server, services);
  *      return;
  *    } catch (RuntimeException e) {
- *      myAbortable.abort("Failed split, abort");
+ *      // abort the server
  *    }
  *  }
  * 
- *

This class is not thread safe. Caller needs ensure split is run by + *

A split transaction is not thread safe. Callers must ensure a split is run by * one thread only. */ -@InterfaceAudience.Private -public class SplitTransaction { - private static final Log LOG = LogFactory.getLog(SplitTransaction.class); - - /* - * Region to split - */ - private final HRegion parent; - private HRegionInfo hri_a; - private HRegionInfo hri_b; - private long fileSplitTimeout = 30000; - - /* - * Row to split around - */ - private final byte [] splitrow; +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public interface SplitTransaction { /** - * Types to add to the transaction journal. - * Each enum is a step in the split transaction. Used to figure how much - * we need to rollback. + * Each enum is a step in the split transaction. */ - static enum JournalEntryType { + public enum SplitTransactionPhase { /** * Started */ STARTED, /** - * Prepared (after table lock) + * Prepared */ PREPARED, /** @@ -148,6 +116,12 @@ public class SplitTransaction { * Opened the second daughter region */ OPENED_REGION_B, + /** + * Point of no return. + * If we got here, then transaction is not recoverable other than by + * crashing out the regionserver. + */ + PONR, /** * Before postSplit coprocessor hook */ @@ -157,327 +131,60 @@ public class SplitTransaction { */ AFTER_POST_SPLIT_HOOK, /** - * Point of no return. - * If we got here, then transaction is not recoverable other than by - * crashing out the regionserver. + * Completed */ - PONR - } - - static class JournalEntry { - private JournalEntryType type; - private long timestamp; - - public JournalEntry(JournalEntryType type) { - this(type, EnvironmentEdgeManager.currentTime()); - } - - public JournalEntry(JournalEntryType type, long timestamp) { - this.type = type; - this.timestamp = timestamp; - } - - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append(type); - sb.append(" at "); - sb.append(timestamp); - return sb.toString(); - } - } - - /* - * Journal of how far the split transaction has progressed. - */ - private final List journal = new ArrayList(); - - /** - * Constructor - * @param r Region to split - * @param splitrow Row to split around - */ - public SplitTransaction(final Region r, final byte [] splitrow) { - this.parent = (HRegion)r; - this.splitrow = splitrow; - this.journal.add(new JournalEntry(JournalEntryType.STARTED)); + COMPLETED } /** - * Does checks on split inputs. + * Split transaction journal entry + */ + public interface JournalEntry { + + /** @return the completed phase marked by this journal entry */ + SplitTransactionPhase getPhase(); + + /** @return the time of phase completion */ + long getTimeStamp(); + } + + /** + * Split transaction listener + */ + public interface TransactionListener { + + /** + * Invoked when transitioning forward from one transaction phase to another + * @param transaction the transaction + * @param from the current phase + * @param to the next phase + * @throws IOException listener can throw this to abort + */ + void transition(SplitTransaction transaction, SplitTransactionPhase from, + SplitTransactionPhase to) throws IOException; + + /** + * Invoked when rolling back a transaction from one transaction phase to the + * previous + * @param transaction the transaction + * @param from the current phase + * @param to the previous phase + */ + void rollback(SplitTransaction transaction, SplitTransactionPhase from, + SplitTransactionPhase to); + } + + /** + * Check split inputs and prepare the transaction. * @return true if the region is splittable else * false if it is not (e.g. its already closed, etc.). + * @throws IOException */ - public boolean prepare() { - if (!this.parent.isSplittable()) return false; - // Split key can be null if this region is unsplittable; i.e. has refs. - if (this.splitrow == null) return false; - HRegionInfo hri = this.parent.getRegionInfo(); - parent.prepareToSplit(); - // Check splitrow. - byte [] startKey = hri.getStartKey(); - byte [] endKey = hri.getEndKey(); - if (Bytes.equals(startKey, splitrow) || - !this.parent.getRegionInfo().containsRow(splitrow)) { - LOG.info("Split row is not inside region key range or is equal to " + - "startkey: " + Bytes.toStringBinary(this.splitrow)); - return false; - } - long rid = getDaughterRegionIdTimestamp(hri); - this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid); - this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid); - this.journal.add(new JournalEntry(JournalEntryType.PREPARED)); - return true; - } - - /** - * Calculate daughter regionid to use. - * @param hri Parent {@link HRegionInfo} - * @return Daughter region id (timestamp) to use. - */ - private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) { - long rid = EnvironmentEdgeManager.currentTime(); - // Regionid is timestamp. Can't be less than that of parent else will insert - // at wrong location in hbase:meta (See HBASE-710). - if (rid < hri.getRegionId()) { - LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() + - " but current time here is " + rid); - rid = hri.getRegionId() + 1; - } - return rid; - } - - private static IOException closedByOtherException = new IOException( - "Failed to close region: already closed by another thread"); - - /** - * Prepare the regions and region files. - * @param server Hosting server instance. Can be null when testing (won't try - * and update in zk if a null server) - * @param services Used to online/offline regions. - * @throws IOException If thrown, transaction failed. - * Call {@link #rollback(Server, RegionServerServices)} - * @return Regions created - */ - /* package */PairOfSameType createDaughters(final Server server, - final RegionServerServices services) throws IOException { - LOG.info("Starting split of region " + this.parent); - if ((server != null && server.isStopped()) || - (services != null && services.isStopping())) { - throw new IOException("Server is stopped or stopping"); - } - assert !this.parent.lock.writeLock().isHeldByCurrentThread(): - "Unsafe to hold write lock while performing RPCs"; - - journal.add(new JournalEntry(JournalEntryType.BEFORE_PRE_SPLIT_HOOK)); - - // Coprocessor callback - if (this.parent.getCoprocessorHost() != null) { - // TODO: Remove one of these - this.parent.getCoprocessorHost().preSplit(); - this.parent.getCoprocessorHost().preSplit(this.splitrow); - } - - journal.add(new JournalEntry(JournalEntryType.AFTER_PRE_SPLIT_HOOK)); - - // If true, no cluster to write meta edits to or to update znodes in. - boolean testing = server == null? true: - server.getConfiguration().getBoolean("hbase.testing.nocluster", false); - this.fileSplitTimeout = testing ? this.fileSplitTimeout : - server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout", - this.fileSplitTimeout); - - PairOfSameType daughterRegions = stepsBeforePONR(server, services, testing); - - List metaEntries = new ArrayList(); - if (this.parent.getCoprocessorHost() != null) { - if (this.parent.getCoprocessorHost(). - preSplitBeforePONR(this.splitrow, metaEntries)) { - throw new IOException("Coprocessor bypassing region " - + this.parent.getRegionInfo().getRegionNameAsString() + " split."); - } - try { - for (Mutation p : metaEntries) { - HRegionInfo.parseRegionName(p.getRow()); - } - } catch (IOException e) { - LOG.error("Row key of mutation from coprossor is not parsable as region name." - + "Mutations from coprocessor should only for hbase:meta table."); - throw e; - } - } - - // This is the point of no return. Adding subsequent edits to .META. as we - // do below when we do the daughter opens adding each to .META. can fail in - // various interesting ways the most interesting of which is a timeout - // BUT the edits all go through (See HBASE-3872). IF we reach the PONR - // then subsequent failures need to crash out this regionserver; the - // server shutdown processing should be able to fix-up the incomplete split. - // The offlined parent will have the daughters as extra columns. If - // we leave the daughter regions in place and do not remove them when we - // crash out, then they will have their references to the parent in place - // still and the server shutdown fixup of .META. will point to these - // regions. - // We should add PONR JournalEntry before offlineParentInMeta,so even if - // OfflineParentInMeta timeout,this will cause regionserver exit,and then - // master ServerShutdownHandler will fix daughter & avoid data loss. (See - // HBase-4562). - this.journal.add(new JournalEntry(JournalEntryType.PONR)); - - // Edit parent in meta. Offlines parent region and adds splita and splitb - // as an atomic update. See HBASE-7721. This update to META makes the region - // will determine whether the region is split or not in case of failures. - // If it is successful, master will roll-forward, if not, master will rollback - // and assign the parent region. - if (services != null && !services.reportRegionStateTransition(TransitionCode.SPLIT_PONR, - parent.getRegionInfo(), hri_a, hri_b)) { - // Passed PONR, let SSH clean it up - throw new IOException("Failed to notify master that split passed PONR: " - + parent.getRegionInfo().getRegionNameAsString()); - } - return daughterRegions; - } - - public PairOfSameType stepsBeforePONR(final Server server, - final RegionServerServices services, boolean testing) throws IOException { - if (services != null && !services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT, - parent.getRegionInfo(), hri_a, hri_b)) { - throw new IOException("Failed to get ok from master to split " - + parent.getRegionInfo().getRegionNameAsString()); - } - this.journal.add(new JournalEntry(JournalEntryType.SET_SPLITTING)); - - this.parent.getRegionFileSystem().createSplitsDir(); - this.journal.add(new JournalEntry(JournalEntryType.CREATE_SPLIT_DIR)); - - Map> hstoreFilesToSplit = null; - Exception exceptionToThrow = null; - try{ - hstoreFilesToSplit = this.parent.close(false); - } catch (Exception e) { - exceptionToThrow = e; - } - if (exceptionToThrow == null && hstoreFilesToSplit == null) { - // The region was closed by a concurrent thread. We can't continue - // with the split, instead we must just abandon the split. If we - // reopen or split this could cause problems because the region has - // probably already been moved to a different server, or is in the - // process of moving to a different server. - exceptionToThrow = closedByOtherException; - } - if (exceptionToThrow != closedByOtherException) { - this.journal.add(new JournalEntry(JournalEntryType.CLOSED_PARENT_REGION)); - } - if (exceptionToThrow != null) { - if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow; - throw new IOException(exceptionToThrow); - } - if (!testing) { - services.removeFromOnlineRegions(this.parent, null); - } - this.journal.add(new JournalEntry(JournalEntryType.OFFLINED_PARENT)); - - // TODO: If splitStoreFiles were multithreaded would we complete steps in - // less elapsed time? St.Ack 20100920 - // - // splitStoreFiles creates daughter region dirs under the parent splits dir - // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will - // clean this up. - Pair expectedReferences = splitStoreFiles(hstoreFilesToSplit); - - // Log to the journal that we are creating region A, the first daughter - // region. We could fail halfway through. If we do, we could have left - // stuff in fs that needs cleanup -- a storefile or two. Thats why we - // add entry to journal BEFORE rather than AFTER the change. - this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_A_CREATION)); - assertReferenceFileCount(expectedReferences.getFirst(), - this.parent.getRegionFileSystem().getSplitsDir(this.hri_a)); - Region a = this.parent.createDaughterRegionFromSplits(this.hri_a); - assertReferenceFileCount(expectedReferences.getFirst(), - new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName())); - - // Ditto - this.journal.add(new JournalEntry(JournalEntryType.STARTED_REGION_B_CREATION)); - assertReferenceFileCount(expectedReferences.getSecond(), - this.parent.getRegionFileSystem().getSplitsDir(this.hri_b)); - Region b = this.parent.createDaughterRegionFromSplits(this.hri_b); - assertReferenceFileCount(expectedReferences.getSecond(), - new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName())); - - return new PairOfSameType(a, b); - } - - void assertReferenceFileCount(int expectedReferenceFileCount, Path dir) - throws IOException { - if (expectedReferenceFileCount != 0 && - expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(this.parent.getFilesystem(), dir)) { - throw new IOException("Failing split. Expected reference file count isn't equal."); - } - } - - /** - * Perform time consuming opening of the daughter regions. - * @param server Hosting server instance. Can be null when testing - * @param services Used to online/offline regions. - * @param a first daughter region - * @param a second daughter region - * @throws IOException If thrown, transaction failed. - * Call {@link #rollback(Server, RegionServerServices)} - */ - /* package */void openDaughters(final Server server, - final RegionServerServices services, Region a, Region b) - throws IOException { - boolean stopped = server != null && server.isStopped(); - boolean stopping = services != null && services.isStopping(); - // TODO: Is this check needed here? - if (stopped || stopping) { - LOG.info("Not opening daughters " + - b.getRegionInfo().getRegionNameAsString() + - " and " + - a.getRegionInfo().getRegionNameAsString() + - " because stopping=" + stopping + ", stopped=" + stopped); - } else { - // Open daughters in parallel. - DaughterOpener aOpener = new DaughterOpener(server, (HRegion)a); - DaughterOpener bOpener = new DaughterOpener(server, (HRegion)b); - aOpener.start(); - bOpener.start(); - try { - aOpener.join(); - if (aOpener.getException() == null) { - journal.add(new JournalEntry(JournalEntryType.OPENED_REGION_A)); - } - bOpener.join(); - if (bOpener.getException() == null) { - journal.add(new JournalEntry(JournalEntryType.OPENED_REGION_B)); - } - } catch (InterruptedException e) { - throw (InterruptedIOException)new InterruptedIOException().initCause(e); - } - if (aOpener.getException() != null) { - throw new IOException("Failed " + - aOpener.getName(), aOpener.getException()); - } - if (bOpener.getException() != null) { - throw new IOException("Failed " + - bOpener.getName(), bOpener.getException()); - } - if (services != null) { - if (!services.reportRegionStateTransition(TransitionCode.SPLIT, - parent.getRegionInfo(), hri_a, hri_b)) { - throw new IOException("Failed to report split region to master: " - + parent.getRegionInfo().getShortNameToLog()); - } - // Should add it to OnlineRegions - services.addToOnlineRegions(b); - services.addToOnlineRegions(a); - } - } - } + boolean prepare() throws IOException; /** * Run the transaction. - * @param server Hosting server instance. Can be null when testing + * @param server Hosting server instance. Can be null when testing. * @param services Used to online/offline regions. * @throws IOException If thrown, transaction failed. * Call {@link #rollback(Server, RegionServerServices)} @@ -485,325 +192,44 @@ public class SplitTransaction { * @throws IOException * @see #rollback(Server, RegionServerServices) */ - public PairOfSameType execute(final Server server, - final RegionServerServices services) - throws IOException { - PairOfSameType regions = createDaughters(server, services); - if (this.parent.getCoprocessorHost() != null) { - this.parent.getCoprocessorHost().preSplitAfterPONR(); - } - return stepsAfterPONR(server, services, regions); - } - - public PairOfSameType stepsAfterPONR(final Server server, - final RegionServerServices services, PairOfSameType regions) - throws IOException { - openDaughters(server, services, regions.getFirst(), regions.getSecond()); - journal.add(new JournalEntry(JournalEntryType.BEFORE_POST_SPLIT_HOOK)); - // Coprocessor callback - if (parent.getCoprocessorHost() != null) { - parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond()); - } - journal.add(new JournalEntry(JournalEntryType.AFTER_POST_SPLIT_HOOK)); - return regions; - } - - public Put addLocation(final Put p, final ServerName sn, long openSeqNum) { - p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, - Bytes.toBytes(sn.getHostAndPort())); - p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, - Bytes.toBytes(sn.getStartcode())); - p.addImmutable(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, - Bytes.toBytes(openSeqNum)); - return p; - } - - /* - * Open daughter region in its own thread. - * If we fail, abort this hosting server. - */ - class DaughterOpener extends HasThread { - private final Server server; - private final HRegion r; - private Throwable t = null; - - DaughterOpener(final Server s, final HRegion r) { - super((s == null? "null-services": s.getServerName()) + - "-daughterOpener=" + r.getRegionInfo().getEncodedName()); - setDaemon(true); - this.server = s; - this.r = r; - } - - /** - * @return Null if open succeeded else exception that causes us fail open. - * Call it after this thread exits else you may get wrong view on result. - */ - Throwable getException() { - return this.t; - } - - @Override - public void run() { - try { - openDaughterRegion(this.server, r); - } catch (Throwable t) { - this.t = t; - } - } - } - - /** - * Open daughter regions, add them to online list and update meta. - * @param server - * @param daughter - * @throws IOException - * @throws KeeperException - */ - void openDaughterRegion(final Server server, final HRegion daughter) - throws IOException, KeeperException { - HRegionInfo hri = daughter.getRegionInfo(); - LoggingProgressable reporter = server == null ? null - : new LoggingProgressable(hri, server.getConfiguration().getLong( - "hbase.regionserver.split.daughter.open.log.interval", 10000)); - daughter.openHRegion(reporter); - } - - static class LoggingProgressable implements CancelableProgressable { - private final HRegionInfo hri; - private long lastLog = -1; - private final long interval; - - LoggingProgressable(final HRegionInfo hri, final long interval) { - this.hri = hri; - this.interval = interval; - } - - @Override - public boolean progress() { - long now = EnvironmentEdgeManager.currentTime(); - if (now - lastLog > this.interval) { - LOG.info("Opening " + this.hri.getRegionNameAsString()); - this.lastLog = now; - } - return true; - } - } - - /** - * Creates reference files for top and bottom half of the - * @param hstoreFilesToSplit map of store files to create half file references for. - * @return the number of reference files that were created. - * @throws IOException - */ - private Pair splitStoreFiles( - final Map> hstoreFilesToSplit) - throws IOException { - if (hstoreFilesToSplit == null) { - // Could be null because close didn't succeed -- for now consider it fatal - throw new IOException("Close returned empty list of StoreFiles"); - } - // The following code sets up a thread pool executor with as many slots as - // there's files to split. It then fires up everything, waits for - // completion and finally checks for any exception - int nbFiles = hstoreFilesToSplit.size(); - if (nbFiles == 0) { - // no file needs to be splitted. - return new Pair(0,0); - } - LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent); - ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); - builder.setNameFormat("StoreFileSplitter-%1$d"); - ThreadFactory factory = builder.build(); - ThreadPoolExecutor threadPool = - (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory); - List>> futures = new ArrayList>> (nbFiles); - - // Split each store file. - for (Map.Entry> entry: hstoreFilesToSplit.entrySet()) { - for (StoreFile sf: entry.getValue()) { - StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf); - futures.add(threadPool.submit(sfs)); - } - } - // Shutdown the pool - threadPool.shutdown(); - - // Wait for all the tasks to finish - try { - boolean stillRunning = !threadPool.awaitTermination( - this.fileSplitTimeout, TimeUnit.MILLISECONDS); - if (stillRunning) { - threadPool.shutdownNow(); - // wait for the thread to shutdown completely. - while (!threadPool.isTerminated()) { - Thread.sleep(50); - } - throw new IOException("Took too long to split the" + - " files and create the references, aborting split"); - } - } catch (InterruptedException e) { - throw (InterruptedIOException)new InterruptedIOException().initCause(e); - } - - int created_a = 0; - int created_b = 0; - // Look for any exception - for (Future> future : futures) { - try { - Pair p = future.get(); - created_a += p.getFirst() != null ? 1 : 0; - created_b += p.getSecond() != null ? 1 : 0; - } catch (InterruptedException e) { - throw (InterruptedIOException) new InterruptedIOException().initCause(e); - } catch (ExecutionException e) { - throw new IOException(e); - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("Split storefiles for region " + this.parent + " Daugther A: " + created_a - + " storefiles, Daugther B: " + created_b + " storefiles."); - } - return new Pair(created_a, created_b); - } - - private Pair splitStoreFile(final byte[] family, final StoreFile sf) throws IOException { - HRegionFileSystem fs = this.parent.getRegionFileSystem(); - String familyName = Bytes.toString(family); - Path path_a = - fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false, - this.parent.getSplitPolicy()); - Path path_b = - fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true, - this.parent.getSplitPolicy()); - return new Pair(path_a, path_b); - } - - /** - * Utility class used to do the file splitting / reference writing - * in parallel instead of sequentially. - */ - class StoreFileSplitter implements Callable> { - private final byte[] family; - private final StoreFile sf; - - /** - * Constructor that takes what it needs to split - * @param family Family that contains the store file - * @param sf which file - */ - public StoreFileSplitter(final byte[] family, final StoreFile sf) { - this.sf = sf; - this.family = family; - } - - public Pair call() throws IOException { - return splitStoreFile(family, sf); - } - } + PairOfSameType execute(Server server, RegionServerServices services) throws IOException; /** + * Roll back a failed transaction * @param server Hosting server instance (May be null when testing). * @param services * @throws IOException If thrown, rollback failed. Take drastic action. * @return True if we successfully rolled back, false if we got to the point * of no return and so now need to abort the server to minimize damage. */ - @SuppressWarnings("deprecation") - public boolean rollback(final Server server, final RegionServerServices services) - throws IOException { - // Coprocessor callback - if (this.parent.getCoprocessorHost() != null) { - this.parent.getCoprocessorHost().preRollBackSplit(); - } + boolean rollback(Server server, RegionServerServices services) throws IOException; - boolean result = true; - ListIterator iterator = - this.journal.listIterator(this.journal.size()); - // Iterate in reverse. - while (iterator.hasPrevious()) { - JournalEntry je = iterator.previous(); - switch(je.type) { + /** + * Register a listener for transaction preparation, execution, and possibly + * rollback phases. + *

A listener can abort a transaction by throwing an exception. + * @param listener the listener + * @return 'this' for chaining + */ + SplitTransaction registerTransactionListener(TransactionListener listener); - case SET_SPLITTING: - if (services != null - && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED, - parent.getRegionInfo(), hri_a, hri_b)) { - return false; - } - break; + /** + * Get the journal for the transaction. + *

Journal entries are an opaque type represented as JournalEntry. They can + * also provide useful debugging information via their toString method. + * @return the transaction journal + */ + List getJournal(); - case CREATE_SPLIT_DIR: - this.parent.writestate.writesEnabled = true; - this.parent.getRegionFileSystem().cleanupSplitsDir(); - break; + /** + * Get the Server running the transaction or rollback + * @return server instance + */ + Server getServer(); - case CLOSED_PARENT_REGION: - try { - // So, this returns a seqid but if we just closed and then reopened, we - // should be ok. On close, we flushed using sequenceid obtained from - // hosting regionserver so no need to propagate the sequenceid returned - // out of initialize below up into regionserver as we normally do. - // TODO: Verify. - this.parent.initialize(); - } catch (IOException e) { - LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " + - this.parent.getRegionInfo().getRegionNameAsString(), e); - throw new RuntimeException(e); - } - break; - - case STARTED_REGION_A_CREATION: - this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a); - break; - - case STARTED_REGION_B_CREATION: - this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b); - break; - - case OFFLINED_PARENT: - if (services != null) services.addToOnlineRegions(this.parent); - break; - - case PONR: - // We got to the point-of-no-return so we need to just abort. Return - // immediately. Do not clean up created daughter regions. They need - // to be in place so we don't delete the parent region mistakenly. - // See HBASE-3872. - return false; - - // Informational only cases - case STARTED: - case PREPARED: - case BEFORE_PRE_SPLIT_HOOK: - case AFTER_PRE_SPLIT_HOOK: - case BEFORE_POST_SPLIT_HOOK: - case AFTER_POST_SPLIT_HOOK: - case OPENED_REGION_A: - case OPENED_REGION_B: - break; - - default: - throw new RuntimeException("Unhandled journal entry: " + je); - } - } - // Coprocessor callback - if (this.parent.getCoprocessorHost() != null) { - this.parent.getCoprocessorHost().postRollBackSplit(); - } - return result; - } - - HRegionInfo getFirstDaughter() { - return hri_a; - } - - HRegionInfo getSecondDaughter() { - return hri_b; - } - - List getJournal() { - return journal; - } + /** + * Get the RegonServerServices of the server running the transaction or rollback + * @return region server services + */ + RegionServerServices getRegionServerServices(); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionFactory.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionFactory.java new file mode 100644 index 00000000000..7df8233f96a --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionFactory.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import org.apache.hadoop.conf.Configurable; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseInterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.classification.InterfaceStability; +import org.apache.hadoop.hbase.util.ReflectionUtils; + +/** + * A factory for creating SplitTransactions, which execute region split as a "transaction". + * See {@link SplitTransaction} + */ +@InterfaceAudience.LimitedPrivate(HBaseInterfaceAudience.COPROC) +@InterfaceStability.Evolving +public class SplitTransactionFactory implements Configurable { + + public static final String SPLIT_TRANSACTION_IMPL_KEY = + "hbase.regionserver.split.transaction.impl"; + + private Configuration conf; + + public SplitTransactionFactory(Configuration conf) { + this.conf = conf; + } + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + } + + /** + * Create a split transaction + * @param r the region to split + * @param splitrow the split point in the keyspace + * @return transaction instance + */ + public SplitTransaction create(final Region r, final byte [] splitrow) { + return ReflectionUtils.instantiateWithCustomCtor( + // The implementation class must extend SplitTransactionImpl, not only + // implement the SplitTransaction interface like you might expect, + // because various places such as AssignmentManager use static methods + // from SplitTransactionImpl. Whatever we use for implementation must + // be compatible, so it's safest to require ? extends SplitTransactionImpl. + // If not compatible we will throw a runtime exception from here. + conf.getClass(SPLIT_TRANSACTION_IMPL_KEY, SplitTransactionImpl.class, + SplitTransactionImpl.class).getName(), + new Class[] { Region.class, byte[].class }, + new Object[] { r, splitrow }); + } + +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java new file mode 100644 index 00000000000..8695c774aea --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/SplitTransactionImpl.java @@ -0,0 +1,789 @@ +/** + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.regionserver; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.ListIterator; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.HRegionInfo; +import org.apache.hadoop.hbase.Server; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.protobuf.generated.RegionServerStatusProtos.RegionStateTransition.TransitionCode; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.HasThread; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.util.PairOfSameType; +import org.apache.zookeeper.KeeperException; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.util.concurrent.ThreadFactoryBuilder; + +@InterfaceAudience.Private +public class SplitTransactionImpl implements SplitTransaction { + private static final Log LOG = LogFactory.getLog(SplitTransactionImpl.class); + + /* + * Region to split + */ + private final HRegion parent; + private HRegionInfo hri_a; + private HRegionInfo hri_b; + private long fileSplitTimeout = 30000; + + /* + * Row to split around + */ + private final byte [] splitrow; + + /* + * Transaction state for listener, only valid during execute and + * rollback + */ + private SplitTransactionPhase currentPhase = SplitTransactionPhase.STARTED; + private Server server; + private RegionServerServices rsServices; + + public static class JournalEntryImpl implements JournalEntry { + private SplitTransactionPhase type; + private long timestamp; + + public JournalEntryImpl(SplitTransactionPhase type) { + this(type, EnvironmentEdgeManager.currentTime()); + } + + public JournalEntryImpl(SplitTransactionPhase type, long timestamp) { + this.type = type; + this.timestamp = timestamp; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append(type); + sb.append(" at "); + sb.append(timestamp); + return sb.toString(); + } + + @Override + public SplitTransactionPhase getPhase() { + return type; + } + + @Override + public long getTimeStamp() { + return timestamp; + } + } + + /* + * Journal of how far the split transaction has progressed. + */ + private final ArrayList journal = new ArrayList(); + + /** + * Listeners + */ + private final ArrayList listeners = new ArrayList(); + + /** + * Constructor + * @param r Region to split + * @param splitrow Row to split around + */ + public SplitTransactionImpl(final Region r, final byte [] splitrow) { + this.parent = (HRegion)r; + this.splitrow = splitrow; + this.journal.add(new JournalEntryImpl(SplitTransactionPhase.STARTED)); + } + + private void transition(SplitTransactionPhase nextPhase) throws IOException { + transition(nextPhase, false); + } + + private void transition(SplitTransactionPhase nextPhase, boolean isRollback) + throws IOException { + if (!isRollback) { + // Add to the journal first, because if the listener throws an exception + // we need to roll back starting at 'nextPhase' + this.journal.add(new JournalEntryImpl(nextPhase)); + } + for (int i = 0; i < listeners.size(); i++) { + TransactionListener listener = listeners.get(i); + if (!isRollback) { + listener.transition(this, currentPhase, nextPhase); + } else { + listener.rollback(this, currentPhase, nextPhase); + } + } + currentPhase = nextPhase; + } + + @Override + public boolean prepare() throws IOException { + if (!this.parent.isSplittable()) return false; + // Split key can be null if this region is unsplittable; i.e. has refs. + if (this.splitrow == null) return false; + HRegionInfo hri = this.parent.getRegionInfo(); + parent.prepareToSplit(); + // Check splitrow. + byte [] startKey = hri.getStartKey(); + byte [] endKey = hri.getEndKey(); + if (Bytes.equals(startKey, splitrow) || + !this.parent.getRegionInfo().containsRow(splitrow)) { + LOG.info("Split row is not inside region key range or is equal to " + + "startkey: " + Bytes.toStringBinary(this.splitrow)); + return false; + } + long rid = getDaughterRegionIdTimestamp(hri); + this.hri_a = new HRegionInfo(hri.getTable(), startKey, this.splitrow, false, rid); + this.hri_b = new HRegionInfo(hri.getTable(), this.splitrow, endKey, false, rid); + + transition(SplitTransactionPhase.PREPARED); + + return true; + } + + /** + * Calculate daughter regionid to use. + * @param hri Parent {@link HRegionInfo} + * @return Daughter region id (timestamp) to use. + */ + private static long getDaughterRegionIdTimestamp(final HRegionInfo hri) { + long rid = EnvironmentEdgeManager.currentTime(); + // Regionid is timestamp. Can't be less than that of parent else will insert + // at wrong location in hbase:meta (See HBASE-710). + if (rid < hri.getRegionId()) { + LOG.warn("Clock skew; parent regions id is " + hri.getRegionId() + + " but current time here is " + rid); + rid = hri.getRegionId() + 1; + } + return rid; + } + + private static IOException closedByOtherException = new IOException( + "Failed to close region: already closed by another thread"); + + /** + * Prepare the regions and region files. + * @param server Hosting server instance. Can be null when testing (won't try + * and update in zk if a null server) + * @param services Used to online/offline regions. + * @throws IOException If thrown, transaction failed. + * Call {@link #rollback(Server, RegionServerServices)} + * @return Regions created + */ + @VisibleForTesting + PairOfSameType createDaughters(final Server server, + final RegionServerServices services) throws IOException { + LOG.info("Starting split of region " + this.parent); + if ((server != null && server.isStopped()) || + (services != null && services.isStopping())) { + throw new IOException("Server is stopped or stopping"); + } + assert !this.parent.lock.writeLock().isHeldByCurrentThread(): + "Unsafe to hold write lock while performing RPCs"; + + transition(SplitTransactionPhase.BEFORE_PRE_SPLIT_HOOK); + + // Coprocessor callback + if (this.parent.getCoprocessorHost() != null) { + // TODO: Remove one of these + this.parent.getCoprocessorHost().preSplit(); + this.parent.getCoprocessorHost().preSplit(this.splitrow); + } + + transition(SplitTransactionPhase.AFTER_PRE_SPLIT_HOOK); + + // If true, no cluster to write meta edits to or to update znodes in. + boolean testing = server == null? true: + server.getConfiguration().getBoolean("hbase.testing.nocluster", false); + this.fileSplitTimeout = testing ? this.fileSplitTimeout : + server.getConfiguration().getLong("hbase.regionserver.fileSplitTimeout", + this.fileSplitTimeout); + + PairOfSameType daughterRegions = stepsBeforePONR(server, services, testing); + + List metaEntries = new ArrayList(); + if (this.parent.getCoprocessorHost() != null) { + if (this.parent.getCoprocessorHost(). + preSplitBeforePONR(this.splitrow, metaEntries)) { + throw new IOException("Coprocessor bypassing region " + + parent.getRegionInfo().getRegionNameAsString() + " split."); + } + try { + for (Mutation p : metaEntries) { + HRegionInfo.parseRegionName(p.getRow()); + } + } catch (IOException e) { + LOG.error("Row key of mutation from coprossor is not parsable as region name." + + "Mutations from coprocessor should only for hbase:meta table."); + throw e; + } + } + + // This is the point of no return. Adding subsequent edits to .META. as we + // do below when we do the daughter opens adding each to .META. can fail in + // various interesting ways the most interesting of which is a timeout + // BUT the edits all go through (See HBASE-3872). IF we reach the PONR + // then subsequent failures need to crash out this regionserver; the + // server shutdown processing should be able to fix-up the incomplete split. + // The offlined parent will have the daughters as extra columns. If + // we leave the daughter regions in place and do not remove them when we + // crash out, then they will have their references to the parent in place + // still and the server shutdown fixup of .META. will point to these + // regions. + // We should add PONR JournalEntry before offlineParentInMeta,so even if + // OfflineParentInMeta timeout,this will cause regionserver exit,and then + // master ServerShutdownHandler will fix daughter & avoid data loss. (See + // HBase-4562). + + transition(SplitTransactionPhase.PONR); + + // Edit parent in meta. Offlines parent region and adds splita and splitb + // as an atomic update. See HBASE-7721. This update to META makes the region + // will determine whether the region is split or not in case of failures. + // If it is successful, master will roll-forward, if not, master will rollback + // and assign the parent region. + if (services != null && !services.reportRegionStateTransition(TransitionCode.SPLIT_PONR, + parent.getRegionInfo(), hri_a, hri_b)) { + // Passed PONR, let SSH clean it up + throw new IOException("Failed to notify master that split passed PONR: " + + parent.getRegionInfo().getRegionNameAsString()); + } + return daughterRegions; + } + + @VisibleForTesting + Put addLocation(final Put p, final ServerName sn, long openSeqNum) { + p.add(HConstants.CATALOG_FAMILY, HConstants.SERVER_QUALIFIER, Bytes + .toBytes(sn.getHostAndPort())); + p.add(HConstants.CATALOG_FAMILY, HConstants.STARTCODE_QUALIFIER, Bytes.toBytes(sn + .getStartcode())); + p.add(HConstants.CATALOG_FAMILY, HConstants.SEQNUM_QUALIFIER, Bytes.toBytes(openSeqNum)); + return p; + } + + @VisibleForTesting + public PairOfSameType stepsBeforePONR(final Server server, + final RegionServerServices services, boolean testing) throws IOException { + if (services != null && !services.reportRegionStateTransition(TransitionCode.READY_TO_SPLIT, + parent.getRegionInfo(), hri_a, hri_b)) { + throw new IOException("Failed to get ok from master to split " + + parent.getRegionInfo().getRegionNameAsString()); + } + + transition(SplitTransactionPhase.SET_SPLITTING); + + this.parent.getRegionFileSystem().createSplitsDir(); + + transition(SplitTransactionPhase.CREATE_SPLIT_DIR); + + Map> hstoreFilesToSplit = null; + Exception exceptionToThrow = null; + try{ + hstoreFilesToSplit = this.parent.close(false); + } catch (Exception e) { + exceptionToThrow = e; + } + if (exceptionToThrow == null && hstoreFilesToSplit == null) { + // The region was closed by a concurrent thread. We can't continue + // with the split, instead we must just abandon the split. If we + // reopen or split this could cause problems because the region has + // probably already been moved to a different server, or is in the + // process of moving to a different server. + exceptionToThrow = closedByOtherException; + } + if (exceptionToThrow != closedByOtherException) { + transition(SplitTransactionPhase.CLOSED_PARENT_REGION); + } + if (exceptionToThrow != null) { + if (exceptionToThrow instanceof IOException) throw (IOException)exceptionToThrow; + throw new IOException(exceptionToThrow); + } + if (!testing) { + services.removeFromOnlineRegions(this.parent, null); + } + + transition(SplitTransactionPhase.OFFLINED_PARENT); + + // TODO: If splitStoreFiles were multithreaded would we complete steps in + // less elapsed time? St.Ack 20100920 + // + // splitStoreFiles creates daughter region dirs under the parent splits dir + // Nothing to unroll here if failure -- clean up of CREATE_SPLIT_DIR will + // clean this up. + Pair expectedReferences = splitStoreFiles(hstoreFilesToSplit); + + // Log to the journal that we are creating region A, the first daughter + // region. We could fail halfway through. If we do, we could have left + // stuff in fs that needs cleanup -- a storefile or two. Thats why we + // add entry to journal BEFORE rather than AFTER the change. + + transition(SplitTransactionPhase.STARTED_REGION_A_CREATION); + + assertReferenceFileCount(expectedReferences.getFirst(), + this.parent.getRegionFileSystem().getSplitsDir(this.hri_a)); + HRegion a = this.parent.createDaughterRegionFromSplits(this.hri_a); + assertReferenceFileCount(expectedReferences.getFirst(), + new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_a.getEncodedName())); + + // Ditto + + transition(SplitTransactionPhase.STARTED_REGION_B_CREATION); + + assertReferenceFileCount(expectedReferences.getSecond(), + this.parent.getRegionFileSystem().getSplitsDir(this.hri_b)); + HRegion b = this.parent.createDaughterRegionFromSplits(this.hri_b); + assertReferenceFileCount(expectedReferences.getSecond(), + new Path(this.parent.getRegionFileSystem().getTableDir(), this.hri_b.getEncodedName())); + + return new PairOfSameType(a, b); + } + + @VisibleForTesting + void assertReferenceFileCount(int expectedReferenceFileCount, Path dir) + throws IOException { + if (expectedReferenceFileCount != 0 && + expectedReferenceFileCount != FSUtils.getRegionReferenceFileCount(parent.getFilesystem(), + dir)) { + throw new IOException("Failing split. Expected reference file count isn't equal."); + } + } + + /** + * Perform time consuming opening of the daughter regions. + * @param server Hosting server instance. Can be null when testing + * @param services Used to online/offline regions. + * @param a first daughter region + * @param a second daughter region + * @throws IOException If thrown, transaction failed. + * Call {@link #rollback(Server, RegionServerServices)} + */ + @VisibleForTesting + void openDaughters(final Server server, final RegionServerServices services, Region a, + Region b) throws IOException { + boolean stopped = server != null && server.isStopped(); + boolean stopping = services != null && services.isStopping(); + // TODO: Is this check needed here? + if (stopped || stopping) { + LOG.info("Not opening daughters " + + b.getRegionInfo().getRegionNameAsString() + + " and " + + a.getRegionInfo().getRegionNameAsString() + + " because stopping=" + stopping + ", stopped=" + stopped); + } else { + // Open daughters in parallel. + DaughterOpener aOpener = new DaughterOpener(server, a); + DaughterOpener bOpener = new DaughterOpener(server, b); + aOpener.start(); + bOpener.start(); + try { + aOpener.join(); + if (aOpener.getException() == null) { + transition(SplitTransactionPhase.OPENED_REGION_A); + } + bOpener.join(); + if (bOpener.getException() == null) { + transition(SplitTransactionPhase.OPENED_REGION_B); + } + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + if (aOpener.getException() != null) { + throw new IOException("Failed " + + aOpener.getName(), aOpener.getException()); + } + if (bOpener.getException() != null) { + throw new IOException("Failed " + + bOpener.getName(), bOpener.getException()); + } + if (services != null) { + if (!services.reportRegionStateTransition(TransitionCode.SPLIT, + parent.getRegionInfo(), hri_a, hri_b)) { + throw new IOException("Failed to report split region to master: " + + parent.getRegionInfo().getShortNameToLog()); + } + // Should add it to OnlineRegions + services.addToOnlineRegions(b); + services.addToOnlineRegions(a); + } + } + } + + @Override + public PairOfSameType execute(final Server server, final RegionServerServices services) + throws IOException { + this.server = server; + this.rsServices = services; + PairOfSameType regions = createDaughters(server, services); + stepsAfterPONR(server, services, regions); + transition(SplitTransactionPhase.COMPLETED); + return regions; + } + + @VisibleForTesting + void stepsAfterPONR(final Server server, + final RegionServerServices services, PairOfSameType regions) + throws IOException { + if (this.parent.getCoprocessorHost() != null) { + this.parent.getCoprocessorHost().preSplitAfterPONR(); + } + + openDaughters(server, services, regions.getFirst(), regions.getSecond()); + + transition(SplitTransactionPhase.BEFORE_POST_SPLIT_HOOK); + + // Coprocessor callback + if (parent.getCoprocessorHost() != null) { + parent.getCoprocessorHost().postSplit(regions.getFirst(), regions.getSecond()); + } + + transition(SplitTransactionPhase.AFTER_POST_SPLIT_HOOK); + } + + /* + * Open daughter region in its own thread. + * If we fail, abort this hosting server. + */ + private class DaughterOpener extends HasThread { + private final Server server; + private final Region r; + private Throwable t = null; + + DaughterOpener(final Server s, final Region r) { + super((s == null? "null-services": s.getServerName()) + + "-daughterOpener=" + r.getRegionInfo().getEncodedName()); + setDaemon(true); + this.server = s; + this.r = r; + } + + /** + * @return Null if open succeeded else exception that causes us fail open. + * Call it after this thread exits else you may get wrong view on result. + */ + Throwable getException() { + return this.t; + } + + @Override + public void run() { + try { + openDaughterRegion(this.server, r); + } catch (Throwable t) { + this.t = t; + } + } + } + + /** + * Open daughter regions, add them to online list and update meta. + * @param server + * @param daughter + * @throws IOException + * @throws KeeperException + */ + @VisibleForTesting + void openDaughterRegion(final Server server, final Region daughter) + throws IOException, KeeperException { + HRegionInfo hri = daughter.getRegionInfo(); + LoggingProgressable reporter = server == null ? null + : new LoggingProgressable(hri, server.getConfiguration().getLong( + "hbase.regionserver.split.daughter.open.log.interval", 10000)); + ((HRegion)daughter).openHRegion(reporter); + } + + static class LoggingProgressable implements CancelableProgressable { + private final HRegionInfo hri; + private long lastLog = -1; + private final long interval; + + LoggingProgressable(final HRegionInfo hri, final long interval) { + this.hri = hri; + this.interval = interval; + } + + @Override + public boolean progress() { + long now = EnvironmentEdgeManager.currentTime(); + if (now - lastLog > this.interval) { + LOG.info("Opening " + this.hri.getRegionNameAsString()); + this.lastLog = now; + } + return true; + } + } + + /** + * Creates reference files for top and bottom half of the + * @param hstoreFilesToSplit map of store files to create half file references for. + * @return the number of reference files that were created. + * @throws IOException + */ + private Pair splitStoreFiles( + final Map> hstoreFilesToSplit) + throws IOException { + if (hstoreFilesToSplit == null) { + // Could be null because close didn't succeed -- for now consider it fatal + throw new IOException("Close returned empty list of StoreFiles"); + } + // The following code sets up a thread pool executor with as many slots as + // there's files to split. It then fires up everything, waits for + // completion and finally checks for any exception + int nbFiles = hstoreFilesToSplit.size(); + if (nbFiles == 0) { + // no file needs to be splitted. + return new Pair(0,0); + } + LOG.info("Preparing to split " + nbFiles + " storefiles for region " + this.parent); + ThreadFactoryBuilder builder = new ThreadFactoryBuilder(); + builder.setNameFormat("StoreFileSplitter-%1$d"); + ThreadFactory factory = builder.build(); + ThreadPoolExecutor threadPool = + (ThreadPoolExecutor) Executors.newFixedThreadPool(nbFiles, factory); + List>> futures = new ArrayList>> (nbFiles); + + // Split each store file. + for (Map.Entry> entry: hstoreFilesToSplit.entrySet()) { + for (StoreFile sf: entry.getValue()) { + StoreFileSplitter sfs = new StoreFileSplitter(entry.getKey(), sf); + futures.add(threadPool.submit(sfs)); + } + } + // Shutdown the pool + threadPool.shutdown(); + + // Wait for all the tasks to finish + try { + boolean stillRunning = !threadPool.awaitTermination( + this.fileSplitTimeout, TimeUnit.MILLISECONDS); + if (stillRunning) { + threadPool.shutdownNow(); + // wait for the thread to shutdown completely. + while (!threadPool.isTerminated()) { + Thread.sleep(50); + } + throw new IOException("Took too long to split the" + + " files and create the references, aborting split"); + } + } catch (InterruptedException e) { + throw (InterruptedIOException)new InterruptedIOException().initCause(e); + } + + int created_a = 0; + int created_b = 0; + // Look for any exception + for (Future> future : futures) { + try { + Pair p = future.get(); + created_a += p.getFirst() != null ? 1 : 0; + created_b += p.getSecond() != null ? 1 : 0; + } catch (InterruptedException e) { + throw (InterruptedIOException) new InterruptedIOException().initCause(e); + } catch (ExecutionException e) { + throw new IOException(e); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("Split storefiles for region " + this.parent + " Daugther A: " + created_a + + " storefiles, Daugther B: " + created_b + " storefiles."); + } + return new Pair(created_a, created_b); + } + + private Pair splitStoreFile(final byte[] family, final StoreFile sf) + throws IOException { + HRegionFileSystem fs = this.parent.getRegionFileSystem(); + String familyName = Bytes.toString(family); + Path path_a = + fs.splitStoreFile(this.hri_a, familyName, sf, this.splitrow, false, + this.parent.getSplitPolicy()); + Path path_b = + fs.splitStoreFile(this.hri_b, familyName, sf, this.splitrow, true, + this.parent.getSplitPolicy()); + return new Pair(path_a, path_b); + } + + /** + * Utility class used to do the file splitting / reference writing + * in parallel instead of sequentially. + */ + private class StoreFileSplitter implements Callable> { + private final byte[] family; + private final StoreFile sf; + + /** + * Constructor that takes what it needs to split + * @param family Family that contains the store file + * @param sf which file + */ + public StoreFileSplitter(final byte[] family, final StoreFile sf) { + this.sf = sf; + this.family = family; + } + + public Pair call() throws IOException { + return splitStoreFile(family, sf); + } + } + + @Override + public boolean rollback(final Server server, final RegionServerServices services) + throws IOException { + this.server = server; + this.rsServices = services; + // Coprocessor callback + if (this.parent.getCoprocessorHost() != null) { + this.parent.getCoprocessorHost().preRollBackSplit(); + } + + boolean result = true; + ListIterator iterator = + this.journal.listIterator(this.journal.size()); + // Iterate in reverse. + while (iterator.hasPrevious()) { + JournalEntry je = iterator.previous(); + + transition(je.getPhase(), true); + + switch (je.getPhase()) { + + case SET_SPLITTING: + if (services != null + && !services.reportRegionStateTransition(TransitionCode.SPLIT_REVERTED, + parent.getRegionInfo(), hri_a, hri_b)) { + return false; + } + break; + + case CREATE_SPLIT_DIR: + this.parent.writestate.writesEnabled = true; + this.parent.getRegionFileSystem().cleanupSplitsDir(); + break; + + case CLOSED_PARENT_REGION: + try { + // So, this returns a seqid but if we just closed and then reopened, we + // should be ok. On close, we flushed using sequenceid obtained from + // hosting regionserver so no need to propagate the sequenceid returned + // out of initialize below up into regionserver as we normally do. + // TODO: Verify. + this.parent.initialize(); + } catch (IOException e) { + LOG.error("Failed rollbacking CLOSED_PARENT_REGION of region " + + parent.getRegionInfo().getRegionNameAsString(), e); + throw new RuntimeException(e); + } + break; + + case STARTED_REGION_A_CREATION: + this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_a); + break; + + case STARTED_REGION_B_CREATION: + this.parent.getRegionFileSystem().cleanupDaughterRegion(this.hri_b); + break; + + case OFFLINED_PARENT: + if (services != null) services.addToOnlineRegions(this.parent); + break; + + case PONR: + // We got to the point-of-no-return so we need to just abort. Return + // immediately. Do not clean up created daughter regions. They need + // to be in place so we don't delete the parent region mistakenly. + // See HBASE-3872. + return false; + + // Informational only cases + case STARTED: + case PREPARED: + case BEFORE_PRE_SPLIT_HOOK: + case AFTER_PRE_SPLIT_HOOK: + case BEFORE_POST_SPLIT_HOOK: + case AFTER_POST_SPLIT_HOOK: + case OPENED_REGION_A: + case OPENED_REGION_B: + case COMPLETED: + break; + + default: + throw new RuntimeException("Unhandled journal entry: " + je); + } + } + // Coprocessor callback + if (this.parent.getCoprocessorHost() != null) { + this.parent.getCoprocessorHost().postRollBackSplit(); + } + return result; + } + + /* package */ HRegionInfo getFirstDaughter() { + return hri_a; + } + + /* package */ HRegionInfo getSecondDaughter() { + return hri_b; + } + + @Override + public List getJournal() { + return journal; + } + + @Override + public SplitTransaction registerTransactionListener(TransactionListener listener) { + listeners.add(listener); + return this; + } + + @Override + public Server getServer() { + return server; + } + + @Override + public RegionServerServices getRegionServerServices() { + return rsServices; + } + +} diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java index 6deade88145..10ecae334f5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestCoprocessorInterface.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hbase.regionserver.RegionCoprocessorHost; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScanType; import org.apache.hadoop.hbase.regionserver.SplitTransaction; +import org.apache.hadoop.hbase.regionserver.SplitTransactionFactory; import org.apache.hadoop.hbase.regionserver.Store; import org.apache.hadoop.hbase.regionserver.StoreFile; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; @@ -500,7 +501,8 @@ public class TestCoprocessorInterface { private Region [] split(final Region r, final byte [] splitRow) throws IOException { Region[] regions = new Region[2]; - SplitTransaction st = new SplitTransaction(r, splitRow); + SplitTransaction st = new SplitTransactionFactory(TEST_UTIL.getConfiguration()) + .create(r, splitRow); int i = 0; if (!st.prepare()) { @@ -509,8 +511,7 @@ public class TestCoprocessorInterface { } try { Server mockServer = Mockito.mock(Server.class); - when(mockServer.getConfiguration()).thenReturn( - TEST_UTIL.getConfiguration()); + when(mockServer.getConfiguration()).thenReturn(TEST_UTIL.getConfiguration()); PairOfSameType daughters = st.execute(mockServer, null); for (Region each_daughter: daughters) { regions[i] = each_daughter; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java index de43feb76a7..e013cbb2c5c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/coprocessor/TestRegionServerObserver.java @@ -40,7 +40,8 @@ import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; import org.apache.hadoop.hbase.regionserver.Region; -import org.apache.hadoop.hbase.regionserver.RegionMergeTransaction; +import org.apache.hadoop.hbase.regionserver.RegionMergeTransactionFactory; +import org.apache.hadoop.hbase.regionserver.RegionMergeTransactionImpl; import org.apache.hadoop.hbase.regionserver.RegionServerCoprocessorHost; import org.apache.hadoop.hbase.testclassification.CoprocessorTests; import org.apache.hadoop.hbase.testclassification.MediumTests; @@ -110,7 +111,7 @@ public class TestRegionServerObserver { } public static class CPRegionServerObserver extends BaseRegionServerObserver { - private RegionMergeTransaction rmt = null; + private RegionMergeTransactionImpl rmt = null; private HRegion mergedRegion = null; private boolean preMergeCalled; @@ -143,7 +144,8 @@ public class TestRegionServerObserver { HRegionServer rs = (HRegionServer) environment.getRegionServerServices(); List onlineRegions = rs.getOnlineRegions(TableName.valueOf("testRegionServerObserver_2")); - rmt = new RegionMergeTransaction(onlineRegions.get(0), onlineRegions.get(1), true); + rmt = (RegionMergeTransactionImpl) new RegionMergeTransactionFactory(rs.getConfiguration()) + .create(onlineRegions.get(0), onlineRegions.get(1), true); if (!rmt.prepare(rs)) { LOG.error("Prepare for the region merge of table " + onlineRegions.get(0).getTableDesc().getNameAsString() diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java index ca96f509ce5..be43950afae 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestEndToEndSplitTransaction.java @@ -106,7 +106,7 @@ public class TestEndToEndSplitTransaction { byte[] regionName = conn.getRegionLocator(tableName).getRegionLocation(splitRow) .getRegionInfo().getRegionName(); Region region = server.getRegion(regionName); - SplitTransaction split = new SplitTransaction(region, splitRow); + SplitTransactionImpl split = new SplitTransactionImpl((HRegion) region, splitRow); split.prepare(); // 1. phase I diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index cb8d0befc95..6a5e844f3b5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -2490,7 +2490,7 @@ public class TestHRegion { */ HRegion[] splitRegion(final HRegion parent, final byte[] midkey) throws IOException { PairOfSameType result = null; - SplitTransaction st = new SplitTransaction(parent, midkey); + SplitTransactionImpl st = new SplitTransactionImpl(parent, midkey); // If prepare does not return true, for some reason -- logged inside in // the prepare call -- we are not ready to split just now. Just return. if (!st.prepare()) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java index 4a2f9bae374..313a6ba2113 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransaction.java @@ -60,7 +60,7 @@ import org.mockito.Mockito; import com.google.common.collect.ImmutableList; /** - * Test the {@link RegionMergeTransaction} class against two HRegions (as + * Test the {@link RegionMergeTransactionImpl} class against two HRegions (as * opposed to running cluster). */ @Category({RegionServerTests.class, SmallTests.class}) @@ -120,10 +120,10 @@ public class TestRegionMergeTransaction { prepareOnGoodRegions(); } - private RegionMergeTransaction prepareOnGoodRegions() throws IOException { - RegionMergeTransaction mt = new RegionMergeTransaction(region_a, region_b, + private RegionMergeTransactionImpl prepareOnGoodRegions() throws IOException { + RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(region_a, region_b, false); - RegionMergeTransaction spyMT = Mockito.spy(mt); + RegionMergeTransactionImpl spyMT = Mockito.spy(mt); doReturn(false).when(spyMT).hasMergeQualifierInMeta(null, region_a.getRegionInfo().getRegionName()); doReturn(false).when(spyMT).hasMergeQualifierInMeta(null, @@ -137,7 +137,7 @@ public class TestRegionMergeTransaction { */ @Test public void testPrepareWithSameRegion() throws IOException { - RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a, + RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(this.region_a, this.region_a, true); assertFalse("should not merge the same region even if it is forcible ", mt.prepare(null)); @@ -148,7 +148,7 @@ public class TestRegionMergeTransaction { */ @Test public void testPrepareWithRegionsNotAdjacent() throws IOException { - RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a, + RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(this.region_a, this.region_c, false); assertFalse("should not merge two regions if they are adjacent except it is forcible", mt.prepare(null)); @@ -160,9 +160,9 @@ public class TestRegionMergeTransaction { @Test public void testPrepareWithRegionsNotAdjacentUnderCompulsory() throws IOException { - RegionMergeTransaction mt = new RegionMergeTransaction(region_a, region_c, + RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(region_a, region_c, true); - RegionMergeTransaction spyMT = Mockito.spy(mt); + RegionMergeTransactionImpl spyMT = Mockito.spy(mt); doReturn(false).when(spyMT).hasMergeQualifierInMeta(null, region_a.getRegionInfo().getRegionName()); doReturn(false).when(spyMT).hasMergeQualifierInMeta(null, @@ -181,7 +181,7 @@ public class TestRegionMergeTransaction { when(storeMock.getFamily()).thenReturn(new HColumnDescriptor("cf")); when(storeMock.close()).thenReturn(ImmutableList.of()); this.region_a.stores.put(Bytes.toBytes(""), storeMock); - RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a, + RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(this.region_a, this.region_b, false); assertFalse( "a region should not be mergeable if it has instances of store file references", @@ -191,7 +191,7 @@ public class TestRegionMergeTransaction { @Test public void testPrepareWithClosedRegion() throws IOException { this.region_a.close(); - RegionMergeTransaction mt = new RegionMergeTransaction(this.region_a, + RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(this.region_a, this.region_b, false); assertFalse(mt.prepare(null)); } @@ -202,9 +202,9 @@ public class TestRegionMergeTransaction { */ @Test public void testPrepareWithRegionsWithMergeReference() throws IOException { - RegionMergeTransaction mt = new RegionMergeTransaction(region_a, region_b, + RegionMergeTransactionImpl mt = new RegionMergeTransactionImpl(region_a, region_b, false); - RegionMergeTransaction spyMT = Mockito.spy(mt); + RegionMergeTransactionImpl spyMT = Mockito.spy(mt); doReturn(true).when(spyMT).hasMergeQualifierInMeta(null, region_a.getRegionInfo().getRegionName()); doReturn(true).when(spyMT).hasMergeQualifierInMeta(null, @@ -221,14 +221,14 @@ public class TestRegionMergeTransaction { assertEquals(rowCountOfRegionB, countRows(this.region_b)); // Start transaction. - RegionMergeTransaction mt = prepareOnGoodRegions(); + RegionMergeTransactionImpl mt = prepareOnGoodRegions(); // Run the execute. Look at what it returns. TEST_UTIL.getConfiguration().setInt(HConstants.REGIONSERVER_PORT, 0); CoordinatedStateManager cp = CoordinatedStateManagerFactory.getCoordinatedStateManager( TEST_UTIL.getConfiguration()); Server mockServer = new HRegionServer(TEST_UTIL.getConfiguration(), cp); - HRegion mergedRegion = mt.execute(mockServer, null); + HRegion mergedRegion = (HRegion)mt.execute(mockServer, null); // Do some assertions about execution. assertTrue(this.fs.exists(mt.getMergesDir())); // Assert region_a and region_b is closed. @@ -265,7 +265,7 @@ public class TestRegionMergeTransaction { assertEquals(rowCountOfRegionB, countRows(this.region_b)); // Start transaction. - RegionMergeTransaction mt = prepareOnGoodRegions(); + RegionMergeTransactionImpl mt = prepareOnGoodRegions(); when(mt.createMergedRegionFromMerges(region_a, region_b, mt.getMergedRegionInfo())).thenThrow( @@ -301,7 +301,7 @@ public class TestRegionMergeTransaction { // Now retry the merge but do not throw an exception this time. assertTrue(mt.prepare(null)); - HRegion mergedRegion = mt.execute(mockServer, null); + HRegion mergedRegion = (HRegion)mt.execute(mockServer, null); // Count rows. daughters are already open // Count rows. merged region are already open try { @@ -325,7 +325,7 @@ public class TestRegionMergeTransaction { assertEquals(rowCountOfRegionB, countRows(this.region_b)); // Start transaction. - RegionMergeTransaction mt = prepareOnGoodRegions(); + RegionMergeTransactionImpl mt = prepareOnGoodRegions(); Mockito.doThrow(new MockedFailedMergedRegionOpen()) .when(mt) .openMergedRegion((Server) Mockito.anyObject(), @@ -365,31 +365,31 @@ public class TestRegionMergeTransaction { byte[] z = Bytes.toBytes("z"); HRegionInfo r1 = new HRegionInfo(tableName); HRegionInfo r2 = new HRegionInfo(tableName, a, z); - HRegionInfo m = RegionMergeTransaction.getMergedRegionInfo(r1, r2); + HRegionInfo m = RegionMergeTransactionImpl.getMergedRegionInfo(r1, r2); assertTrue(Bytes.equals(m.getStartKey(), r1.getStartKey()) && Bytes.equals(m.getEndKey(), r1.getEndKey())); r1 = new HRegionInfo(tableName, null, a); r2 = new HRegionInfo(tableName, a, z); - m = RegionMergeTransaction.getMergedRegionInfo(r1, r2); + m = RegionMergeTransactionImpl.getMergedRegionInfo(r1, r2); assertTrue(Bytes.equals(m.getStartKey(), r1.getStartKey()) && Bytes.equals(m.getEndKey(), r2.getEndKey())); r1 = new HRegionInfo(tableName, null, a); r2 = new HRegionInfo(tableName, z, null); - m = RegionMergeTransaction.getMergedRegionInfo(r1, r2); + m = RegionMergeTransactionImpl.getMergedRegionInfo(r1, r2); assertTrue(Bytes.equals(m.getStartKey(), r1.getStartKey()) && Bytes.equals(m.getEndKey(), r2.getEndKey())); r1 = new HRegionInfo(tableName, a, z); r2 = new HRegionInfo(tableName, z, null); - m = RegionMergeTransaction.getMergedRegionInfo(r1, r2); + m = RegionMergeTransactionImpl.getMergedRegionInfo(r1, r2); assertTrue(Bytes.equals(m.getStartKey(), r1.getStartKey()) && Bytes.equals(m.getEndKey(), r2.getEndKey())); r1 = new HRegionInfo(tableName, a, b); r2 = new HRegionInfo(tableName, b, z); - m = RegionMergeTransaction.getMergedRegionInfo(r1, r2); + m = RegionMergeTransactionImpl.getMergedRegionInfo(r1, r2); assertTrue(Bytes.equals(m.getStartKey(), r1.getStartKey()) && Bytes.equals(m.getEndKey(), r2.getEndKey())); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java index f4b6f02bcc8..2a949a101c5 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionMergeTransactionOnCluster.java @@ -81,7 +81,7 @@ import com.google.protobuf.ServiceException; /** * Like {@link TestRegionMergeTransaction} in that we're testing - * {@link RegionMergeTransaction} only the below tests are against a running + * {@link RegionMergeTransactionImpl} only the below tests are against a running * cluster where {@link TestRegionMergeTransaction} is tests against bare * {@link HRegion}. */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java index 0ed76457b41..da4f811702c 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransaction.java @@ -67,7 +67,7 @@ import org.mockito.Mockito; import com.google.common.collect.ImmutableList; /** - * Test the {@link SplitTransaction} class against an HRegion (as opposed to + * Test the {@link SplitTransactionImpl} class against an HRegion (as opposed to * running cluster). */ @Category({RegionServerTests.class, SmallTests.class}) @@ -120,8 +120,8 @@ public class TestSplitTransaction { assertEquals(rowcount, parentRowCount); // Start transaction. - SplitTransaction st = prepareGOOD_SPLIT_ROW(); - SplitTransaction spiedUponSt = spy(st); + SplitTransactionImpl st = prepareGOOD_SPLIT_ROW(); + SplitTransactionImpl spiedUponSt = spy(st); Mockito .doThrow(new MockedFailedDaughterOpen()) .when(spiedUponSt) @@ -161,12 +161,13 @@ public class TestSplitTransaction { prepareGOOD_SPLIT_ROW(); } - private SplitTransaction prepareGOOD_SPLIT_ROW() { + private SplitTransactionImpl prepareGOOD_SPLIT_ROW() throws IOException { return prepareGOOD_SPLIT_ROW(this.parent); } - private SplitTransaction prepareGOOD_SPLIT_ROW(final HRegion parentRegion) { - SplitTransaction st = new SplitTransaction(parentRegion, GOOD_SPLIT_ROW); + private SplitTransactionImpl prepareGOOD_SPLIT_ROW(final HRegion parentRegion) + throws IOException { + SplitTransactionImpl st = new SplitTransactionImpl(parentRegion, GOOD_SPLIT_ROW); assertTrue(st.prepare()); return st; } @@ -181,7 +182,7 @@ public class TestSplitTransaction { when(storeMock.close()).thenReturn(ImmutableList.of()); this.parent.stores.put(Bytes.toBytes(""), storeMock); - SplitTransaction st = new SplitTransaction(this.parent, GOOD_SPLIT_ROW); + SplitTransactionImpl st = new SplitTransactionImpl(this.parent, GOOD_SPLIT_ROW); assertFalse("a region should not be splittable if it has instances of store file references", st.prepare()); @@ -192,19 +193,19 @@ public class TestSplitTransaction { */ @Test public void testPrepareWithBadSplitRow() throws IOException { // Pass start row as split key. - SplitTransaction st = new SplitTransaction(this.parent, STARTROW); + SplitTransactionImpl st = new SplitTransactionImpl(this.parent, STARTROW); assertFalse(st.prepare()); - st = new SplitTransaction(this.parent, HConstants.EMPTY_BYTE_ARRAY); + st = new SplitTransactionImpl(this.parent, HConstants.EMPTY_BYTE_ARRAY); assertFalse(st.prepare()); - st = new SplitTransaction(this.parent, new byte [] {'A', 'A', 'A'}); + st = new SplitTransactionImpl(this.parent, new byte [] {'A', 'A', 'A'}); assertFalse(st.prepare()); - st = new SplitTransaction(this.parent, ENDROW); + st = new SplitTransactionImpl(this.parent, ENDROW); assertFalse(st.prepare()); } @Test public void testPrepareWithClosedRegion() throws IOException { this.parent.close(); - SplitTransaction st = new SplitTransaction(this.parent, GOOD_SPLIT_ROW); + SplitTransactionImpl st = new SplitTransactionImpl(this.parent, GOOD_SPLIT_ROW); assertFalse(st.prepare()); } @@ -220,7 +221,7 @@ public class TestSplitTransaction { ((LruBlockCache) cacheConf.getBlockCache()).clearCache(); // Start transaction. - SplitTransaction st = prepareGOOD_SPLIT_ROW(); + SplitTransactionImpl st = prepareGOOD_SPLIT_ROW(); // Run the execute. Look at what it returns. Server mockServer = Mockito.mock(Server.class); @@ -266,8 +267,8 @@ public class TestSplitTransaction { // Start transaction. HRegion spiedRegion = spy(this.parent); - SplitTransaction st = prepareGOOD_SPLIT_ROW(spiedRegion); - SplitTransaction spiedUponSt = spy(st); + SplitTransactionImpl st = prepareGOOD_SPLIT_ROW(spiedRegion); + SplitTransactionImpl spiedUponSt = spy(st); doThrow(new IOException("Failing split. Expected reference file count isn't equal.")) .when(spiedUponSt).assertReferenceFileCount(anyInt(), eq(new Path(this.parent.getRegionFileSystem().getTableDir(), @@ -294,8 +295,8 @@ public class TestSplitTransaction { // Start transaction. HRegion spiedRegion = spy(this.parent); - SplitTransaction st = prepareGOOD_SPLIT_ROW(spiedRegion); - SplitTransaction spiedUponSt = spy(st); + SplitTransactionImpl st = prepareGOOD_SPLIT_ROW(spiedRegion); + SplitTransactionImpl spiedUponSt = spy(st); doNothing().when(spiedUponSt).assertReferenceFileCount(anyInt(), eq(parent.getRegionFileSystem().getSplitsDir(st.getFirstDaughter()))); when(spiedRegion.createDaughterRegionFromSplits(spiedUponSt.getSecondDaughter())). diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java index 826495ddae3..66f8cbc0945 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestSplitTransactionOnCluster.java @@ -111,7 +111,7 @@ import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; /** - * Like TestSplitTransaction in that we're testing {@link SplitTransaction} + * Like TestSplitTransaction in that we're testing {@link SplitTransactionImpl} * only the below tests are against a running cluster where TestSplitTransaction * is tests against a bare {@link HRegion}. */ @@ -264,7 +264,7 @@ public class TestSplitTransactionOnCluster { assertTrue(fileNum > store.getStorefiles().size()); // 3, Split - SplitTransaction st = new SplitTransaction(region, Bytes.toBytes("row3")); + SplitTransactionImpl st = new SplitTransactionImpl(region, Bytes.toBytes("row3")); assertTrue(st.prepare()); st.execute(regionServer, regionServer); assertEquals(2, cluster.getRegions(tableName).size()); @@ -474,7 +474,6 @@ public class TestSplitTransactionOnCluster { @Test(timeout = 180000) public void testSplitShouldNotThrowNPEEvenARegionHasEmptySplitFiles() throws Exception { - Configuration conf = TESTING_UTIL.getConfiguration(); TableName userTableName = TableName.valueOf("testSplitShouldNotThrowNPEEvenARegionHasEmptySplitFiles"); HTableDescriptor htd = new HTableDescriptor(userTableName); @@ -649,7 +648,7 @@ public class TestSplitTransactionOnCluster { assertEquals("The specified table should present.", true, tableExists); final HRegion region = findSplittableRegion(regions); assertTrue("not able to find a splittable region", region != null); - SplitTransaction st = new SplitTransaction(region, Bytes.toBytes("row2")); + SplitTransactionImpl st = new SplitTransactionImpl(region, Bytes.toBytes("row2")); try { st.prepare(); st.createDaughters(regionServer, regionServer); @@ -707,7 +706,7 @@ public class TestSplitTransactionOnCluster { regionServerIndex = cluster.getServerWith(region.getRegionInfo().getRegionName()); regionServer = cluster.getRegionServer(regionServerIndex); assertTrue("not able to find a splittable region", region != null); - SplitTransaction st = new SplitTransaction(region, Bytes.toBytes("row2")); + SplitTransactionImpl st = new SplitTransactionImpl(region, Bytes.toBytes("row2")); try { st.prepare(); st.execute(regionServer, regionServer); @@ -807,7 +806,7 @@ public class TestSplitTransactionOnCluster { assertTrue("not able to find a splittable region", region != null); // Now split. - SplitTransaction st = new MockedSplitTransaction(region, Bytes.toBytes("row2")); + SplitTransactionImpl st = new MockedSplitTransaction(region, Bytes.toBytes("row2")); try { st.prepare(); st.execute(regionServer, regionServer); @@ -882,7 +881,7 @@ public class TestSplitTransactionOnCluster { assertTrue("not able to find a splittable region", region != null); // Now split. - SplitTransaction st = new SplitTransaction(region, Bytes.toBytes("row2")); + SplitTransactionImpl st = new SplitTransactionImpl(region, Bytes.toBytes("row2")); try { st.prepare(); st.execute(regionServer, regionServer); @@ -976,7 +975,7 @@ public class TestSplitTransactionOnCluster { int serverWith = cluster.getServerWith(regions.get(0).getRegionInfo().getRegionName()); HRegionServer regionServer = cluster.getRegionServer(serverWith); cluster.getServerWith(regions.get(0).getRegionInfo().getRegionName()); - SplitTransaction st = new SplitTransaction(regions.get(0), Bytes.toBytes("r3")); + SplitTransactionImpl st = new SplitTransactionImpl(regions.get(0), Bytes.toBytes("r3")); st.prepare(); st.stepsBeforePONR(regionServer, regionServer, false); Path tableDir = @@ -1019,7 +1018,7 @@ public class TestSplitTransactionOnCluster { HRegionServer regionServer = cluster.getRegionServer(regionServerIndex); final HRegion region = findSplittableRegion(regions); assertTrue("not able to find a splittable region", region != null); - SplitTransaction st = new MockedSplitTransaction(region, Bytes.toBytes("row2")) { + SplitTransactionImpl st = new MockedSplitTransaction(region, Bytes.toBytes("row2")) { @Override public PairOfSameType stepsBeforePONR(final Server server, final RegionServerServices services, boolean testing) throws IOException { @@ -1080,7 +1079,7 @@ public class TestSplitTransactionOnCluster { } } - public static class MockedSplitTransaction extends SplitTransaction { + public static class MockedSplitTransaction extends SplitTransactionImpl { private HRegion currentRegion; public MockedSplitTransaction(HRegion region, byte[] splitrow) { @@ -1318,7 +1317,7 @@ public class TestSplitTransactionOnCluster { } public static class MockedRegionObserver extends BaseRegionObserver { - private SplitTransaction st = null; + private SplitTransactionImpl st = null; private PairOfSameType daughterRegions = null; @Override @@ -1335,7 +1334,7 @@ public class TestSplitTransactionOnCluster { break; } } - st = new SplitTransaction(region, splitKey); + st = new SplitTransactionImpl((HRegion) region, splitKey); if (!st.prepare()) { LOG.error("Prepare for the table " + region.getTableDesc().getNameAsString() + " failed. So returning null. "); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java index 8b6e84c1a33..48d0bfc5816 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestHBaseFsck.java @@ -100,7 +100,8 @@ import org.apache.hadoop.hbase.protobuf.generated.AdminProtos; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionFileSystem; import org.apache.hadoop.hbase.regionserver.HRegionServer; -import org.apache.hadoop.hbase.regionserver.SplitTransaction; +import org.apache.hadoop.hbase.regionserver.SplitTransactionFactory; +import org.apache.hadoop.hbase.regionserver.SplitTransactionImpl; import org.apache.hadoop.hbase.regionserver.TestEndToEndSplitTransaction; import org.apache.hadoop.hbase.security.access.AccessControlClient; import org.apache.hadoop.hbase.testclassification.LargeTests; @@ -1251,7 +1252,9 @@ public class TestHBaseFsck { int serverWith = cluster.getServerWith(regions.get(0).getRegionInfo().getRegionName()); HRegionServer regionServer = cluster.getRegionServer(serverWith); cluster.getServerWith(regions.get(0).getRegionInfo().getRegionName()); - SplitTransaction st = new SplitTransaction(regions.get(0), Bytes.toBytes("r3")); + SplitTransactionImpl st = (SplitTransactionImpl) + new SplitTransactionFactory(TEST_UTIL.getConfiguration()) + .create(regions.get(0), Bytes.toBytes("r3")); st.prepare(); st.stepsBeforePONR(regionServer, regionServer, false); AssignmentManager am = cluster.getMaster().getAssignmentManager();