From 34514ccf0ab29b921044d18d751e8507d6ee23dc Mon Sep 17 00:00:00 2001 From: Jingyun Tian Date: Wed, 29 May 2019 11:07:52 +0800 Subject: [PATCH] HBASE-22454 refactor WALSplitter --- .../ZKSplitLogManagerCoordination.java | 4 +- .../assignment/AssignmentManagerUtil.java | 4 +- .../MergeTableRegionsProcedure.java | 6 +- .../master/assignment/RegionStateStore.java | 4 +- .../assignment/SplitTableRegionProcedure.java | 8 +- .../procedure/DisableTableProcedure.java | 4 +- .../hadoop/hbase/regionserver/HRegion.java | 25 +- .../hbase/regionserver/RSRpcServices.java | 15 +- .../RegionReplicaReplicationEndpoint.java | 4 +- .../apache/hadoop/hbase/util/HBaseFsck.java | 4 +- .../BoundedLogWriterCreationOutputSink.java | 151 ++ .../apache/hadoop/hbase/wal/EntryBuffers.java | 158 ++ .../wal/LogRecoveredEditsOutputSink.java | 460 ++++++ .../apache/hadoop/hbase/wal/OutputSink.java | 252 +++ .../apache/hadoop/hbase/wal/WALSplitUtil.java | 523 ++++++ .../apache/hadoop/hbase/wal/WALSplitter.java | 1423 +---------------- .../hadoop/hbase/master/AbstractTestDLS.java | 6 +- ...DeleteColumnFamilyProcedureFromClient.java | 6 +- .../hbase/regionserver/TestHRegion.java | 14 +- .../regionserver/TestHRegionReplayEvents.java | 2 +- .../regionserver/TestRecoveredEdits.java | 4 +- .../TestRecoveredEditsReplayAndAbort.java | 4 +- .../wal/AbstractTestWALReplay.java | 14 +- .../snapshot/TestRestoreSnapshotHelper.java | 6 +- .../hbase/wal/TestReadWriteSeqIdFiles.java | 16 +- .../hadoop/hbase/wal/TestWALMethods.java | 27 +- .../apache/hadoop/hbase/wal/TestWALSplit.java | 14 +- 27 files changed, 1680 insertions(+), 1478 deletions(-) create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java create mode 100644 hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java index fac1532c1a8..3ff722a5c85 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/coordination/ZKSplitLogManagerCoordination.java @@ -41,7 +41,7 @@ import org.apache.hadoop.hbase.master.SplitLogManager.ResubmitDirective; import org.apache.hadoop.hbase.master.SplitLogManager.Task; import org.apache.hadoop.hbase.master.SplitLogManager.TerminationStatus; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.hadoop.hbase.zookeeper.ZKListener; import org.apache.hadoop.hbase.zookeeper.ZKMetadata; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; @@ -92,7 +92,7 @@ public class ZKSplitLogManagerCoordination extends ZKListener implements @Override public Status finish(ServerName workerName, String logfile) { try { - WALSplitter.finishSplitLogFile(logfile, conf); + WALSplitUtil.finishSplitLogFile(logfile, conf); } catch (IOException e) { LOG.warn("Could not finish splitting of log file " + logfile, e); return Status.ERR; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java index 4f9343cf22d..d401141c9a4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/AssignmentManagerUtil.java @@ -33,7 +33,7 @@ import org.apache.hadoop.hbase.client.RegionReplicaUtil; import org.apache.hadoop.hbase.favored.FavoredNodesManager; import org.apache.hadoop.hbase.ipc.HBaseRpcController; import org.apache.hadoop.hbase.master.procedure.MasterProcedureEnv; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.yetus.audience.InterfaceAudience; import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; @@ -229,7 +229,7 @@ final class AssignmentManagerUtil { } static void checkClosedRegion(MasterProcedureEnv env, RegionInfo regionInfo) throws IOException { - if (WALSplitter.hasRecoveredEdits(env.getMasterConfiguration(), regionInfo)) { + if (WALSplitUtil.hasRecoveredEdits(env.getMasterConfiguration(), regionInfo)) { throw new IOException("Recovered.edits are found in Region: " + regionInfo + ", abort split/merge to prevent data loss"); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java index 241e8f94ef0..11ae8fe7bf0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/MergeTableRegionsProcedure.java @@ -59,7 +59,7 @@ import org.apache.hadoop.hbase.regionserver.StoreFileInfo; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -777,11 +777,11 @@ public class MergeTableRegionsProcedure long maxSequenceId = -1L; for (RegionInfo region : regionsToMerge) { maxSequenceId = - Math.max(maxSequenceId, WALSplitter.getMaxRegionSequenceId( + Math.max(maxSequenceId, WALSplitUtil.getMaxRegionSequenceId( walFS, getWALRegionDir(env, region))); } if (maxSequenceId > 0) { - WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion), + WALSplitUtil.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, mergedRegion), maxSequenceId); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java index ce4bc38cd38..a8491fecfc7 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/RegionStateStore.java @@ -42,7 +42,7 @@ import org.apache.hadoop.hbase.procedure2.util.StringUtils; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.hadoop.hbase.zookeeper.MetaTableLocator; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -219,7 +219,7 @@ public class RegionStateStore { private long getOpenSeqNumForParentRegion(RegionInfo region) throws IOException { FileSystem walFS = master.getMasterWalManager().getFileSystem(); long maxSeqId = - WALSplitter.getMaxRegionSequenceId(walFS, FSUtils.getWALRegionDir( + WALSplitUtil.getMaxRegionSequenceId(walFS, FSUtils.getWALRegionDir( master.getConfiguration(), region.getTable(), region.getEncodedName())); return maxSeqId > 0 ? maxSeqId + 1 : HConstants.NO_SEQNUM; } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java index 657f3976ce8..346905a3c63 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/assignment/SplitTableRegionProcedure.java @@ -68,7 +68,7 @@ import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.hadoop.util.ReflectionUtils; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -876,11 +876,11 @@ public class SplitTableRegionProcedure private void writeMaxSequenceIdFile(MasterProcedureEnv env) throws IOException { FileSystem walFS = env.getMasterServices().getMasterWalManager().getFileSystem(); long maxSequenceId = - WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion())); + WALSplitUtil.getMaxRegionSequenceId(walFS, getWALRegionDir(env, getParentRegion())); if (maxSequenceId > 0) { - WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterOneRI), + WALSplitUtil.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterOneRI), maxSequenceId); - WALSplitter.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterTwoRI), + WALSplitUtil.writeRegionSequenceIdFile(walFS, getWALRegionDir(env, daughterTwoRI), maxSequenceId); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java index 9cacc5d6b95..85a8226b80c 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/master/procedure/DisableTableProcedure.java @@ -34,7 +34,7 @@ import org.apache.hadoop.hbase.master.MasterCoprocessorHost; import org.apache.hadoop.hbase.master.TableStateManager; import org.apache.hadoop.hbase.procedure2.ProcedureStateSerializer; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,7 +117,7 @@ public class DisableTableProcedure for (RegionInfo region : env.getAssignmentManager().getRegionStates() .getRegionsOfTable(tableName)) { long maxSequenceId = - WALSplitter.getMaxRegionSequenceId(walFS, getWALRegionDir(env, region)); + WALSplitUtil.getMaxRegionSequenceId(walFS, getWALRegionDir(env, region)); long openSeqNum = maxSequenceId > 0 ? maxSequenceId + 1 : HConstants.NO_SEQNUM; mutator.mutate(MetaTableAccessor.makePutForReplicationBarrier(region, openSeqNum, EnvironmentEdgeManager.currentTime())); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index 24bafabb0ee..6e1487337d6 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -174,8 +174,8 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; import org.apache.hadoop.hbase.wal.WALKeyImpl; -import org.apache.hadoop.hbase.wal.WALSplitter; -import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; +import org.apache.hadoop.hbase.wal.WALSplitUtil; +import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.util.StringUtils; import org.apache.htrace.core.TraceScope; @@ -1009,15 +1009,16 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // (particularly if no recovered edits, seqid will be -1). long nextSeqId = maxSeqId + 1; if (!isRestoredRegion) { - long maxSeqIdFromFile = - WALSplitter.getMaxRegionSequenceId(getWalFileSystem(), getWALRegionDirOfDefaultReplica()); + long maxSeqIdFromFile = WALSplitUtil.getMaxRegionSequenceId(getWalFileSystem(), + getWALRegionDirOfDefaultReplica()); nextSeqId = Math.max(maxSeqId, maxSeqIdFromFile) + 1; // The openSeqNum will always be increase even for read only region, as we rely on it to // determine whether a region has been successfully reopend, so here we always need to update // the max sequence id file. if (RegionReplicaUtil.isDefaultReplica(getRegionInfo())) { LOG.debug("writing seq id for {}", this.getRegionInfo().getEncodedName()); - WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), nextSeqId - 1); + WALSplitUtil.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), + nextSeqId - 1); } } @@ -1174,7 +1175,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // checking region folder exists is due to many tests which delete the table folder while a // table is still online if (getWalFileSystem().exists(getWALRegionDir())) { - WALSplitter.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), + WALSplitUtil.writeRegionSequenceIdFile(getWalFileSystem(), getWALRegionDir(), mvcc.getReadPoint()); } } @@ -4586,13 +4587,13 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi NavigableSet filesUnderRootDir = null; if (!regionDir.equals(defaultRegionDir)) { filesUnderRootDir = - WALSplitter.getSplitEditFilesSorted(rootFS, defaultRegionDir); + WALSplitUtil.getSplitEditFilesSorted(rootFS, defaultRegionDir); seqid = Math.max(seqid, replayRecoveredEditsForPaths(minSeqIdForTheRegion, rootFS, filesUnderRootDir, reporter, defaultRegionDir)); } - NavigableSet files = WALSplitter.getSplitEditFilesSorted(walFS, regionDir); + NavigableSet files = WALSplitUtil.getSplitEditFilesSorted(walFS, regionDir); seqid = Math.max(seqid, replayRecoveredEditsForPaths(minSeqIdForTheRegion, walFS, files, reporter, regionDir)); @@ -4605,7 +4606,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // For debugging data loss issues! // If this flag is set, make use of the hfile archiving by making recovered.edits a fake // column family. Have to fake out file type too by casting our recovered.edits as storefiles - String fakeFamilyName = WALSplitter.getRegionDirRecoveredEditsDir(regionDir).getName(); + String fakeFamilyName = WALSplitUtil.getRegionDirRecoveredEditsDir(regionDir).getName(); Set fakeStoreFiles = new HashSet<>(files.size()); for (Path file: files) { fakeStoreFiles.add( @@ -4682,7 +4683,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + " instead."); } if (skipErrors) { - Path p = WALSplitter.moveAsideBadEditsFile(fs, edits); + Path p = WALSplitUtil.moveAsideBadEditsFile(fs, edits); LOG.error(HConstants.HREGION_EDITS_REPLAY_SKIP_ERRORS + "=true so continuing. Renamed " + edits + " as " + p, e); @@ -4862,7 +4863,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi coprocessorHost.postReplayWALs(this.getRegionInfo(), edits); } } catch (EOFException eof) { - Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits); + Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits); msg = "EnLongAddered EOF. Most likely due to Master failure during " + "wal splitting, so we have this data in another edit. " + "Continuing, but renaming " + edits + " as " + p; @@ -4872,7 +4873,7 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // If the IOE resulted from bad file format, // then this problem is idempotent and retrying won't help if (ioe.getCause() instanceof ParseException) { - Path p = WALSplitter.moveAsideBadEditsFile(walFS, edits); + Path p = WALSplitUtil.moveAsideBadEditsFile(walFS, edits); msg = "File corruption enLongAddered! " + "Continuing, but renaming " + edits + " as " + p; LOG.warn(msg, ioe); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index adfa84fbdd7..64ecbc20be4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -140,7 +140,8 @@ import org.apache.hadoop.hbase.util.Strings; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALSplitUtil; +import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.yetus.audience.InterfaceAudience; import org.apache.zookeeper.KeeperException; @@ -1109,14 +1110,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, * @throws IOException */ private OperationStatus [] doReplayBatchOp(final HRegion region, - final List mutations, long replaySeqId) throws IOException { + final List mutations, long replaySeqId) throws IOException { long before = EnvironmentEdgeManager.currentTime(); boolean batchContainsPuts = false, batchContainsDelete = false; try { - for (Iterator it = mutations.iterator(); it.hasNext();) { - WALSplitter.MutationReplay m = it.next(); + for (Iterator it = mutations.iterator(); it.hasNext();) { + MutationReplay m = it.next(); - if (m.type == MutationType.PUT) { + if (m.getType() == MutationType.PUT) { batchContainsPuts = true; } else { batchContainsDelete = true; @@ -1160,7 +1161,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, regionServer.cacheFlusher.reclaimMemStoreMemory(); } return region.batchReplay(mutations.toArray( - new WALSplitter.MutationReplay[mutations.size()]), replaySeqId); + new MutationReplay[mutations.size()]), replaySeqId); } finally { updateMutationMetrics(region, before, batchContainsPuts, batchContainsDelete); } @@ -2217,7 +2218,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, entry.getKey().getWriteTime()); } Pair walEntry = (coprocessorHost == null) ? null : new Pair<>(); - List edits = WALSplitter.getMutationsFromWALEntry(entry, + List edits = WALSplitUtil.getMutationsFromWALEntry(entry, cells, walEntry, durability); if (coprocessorHost != null) { // Start coprocessor replay here. The coprocessor is for each WALEdit instead of a diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java index f7721e0934a..d1498a8b067 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/replication/regionserver/RegionReplicaReplicationEndpoint.java @@ -59,9 +59,9 @@ import org.apache.hadoop.hbase.replication.WALEntryFilter; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.hbase.wal.EntryBuffers; +import org.apache.hadoop.hbase.wal.OutputSink; import org.apache.hadoop.hbase.wal.WAL.Entry; -import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers; -import org.apache.hadoop.hbase.wal.WALSplitter.OutputSink; import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; import org.apache.hadoop.hbase.wal.WALSplitter.SinkWriter; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java index a5c1fce83cb..f74906169a9 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/HBaseFsck.java @@ -133,7 +133,7 @@ import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandler; import org.apache.hadoop.hbase.util.hbck.TableIntegrityErrorHandlerImpl; import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALFactory; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.apache.hadoop.hbase.zookeeper.ZNodePaths; @@ -4463,7 +4463,7 @@ public class HBaseFsck extends Configured implements Closeable { // This is special case if a region is left after split he.hdfsOnlyEdits = true; FileStatus[] subDirs = fs.listStatus(regionDir.getPath()); - Path ePath = WALSplitter.getRegionDirRecoveredEditsDir(regionDir.getPath()); + Path ePath = WALSplitUtil.getRegionDirRecoveredEditsDir(regionDir.getPath()); for (FileStatus subDir : subDirs) { errors.progress(); String sdName = subDir.getPath().getName(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java new file mode 100644 index 00000000000..5fa7bef5c6b --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class that manages the output streams from the log splitting process. + * Bounded means the output streams will be no more than the size of threadpool + */ +@InterfaceAudience.Private +public class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink { + private static final Logger LOG = + LoggerFactory.getLogger(BoundedLogWriterCreationOutputSink.class); + + private ConcurrentHashMap regionRecoverStatMap = new ConcurrentHashMap<>(); + + public BoundedLogWriterCreationOutputSink(WALSplitter walSplitter, + WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { + super(walSplitter, controller, entryBuffers, numWriters); + } + + @Override + public List finishWritingAndClose() throws IOException { + boolean isSuccessful; + List result; + try { + isSuccessful = finishWriting(false); + } finally { + result = close(); + } + if (isSuccessful) { + splits = result; + } + return splits; + } + + @Override + boolean executeCloseTask(CompletionService completionService, List thrown, + List paths) throws InterruptedException, ExecutionException { + for (final Map.Entry buffer : entryBuffers.buffers + .entrySet()) { + LOG.info("Submitting writeThenClose of {}", + Arrays.toString(buffer.getValue().encodedRegionName)); + completionService.submit(new Callable() { + @Override + public Void call() throws Exception { + Path dst = writeThenClose(buffer.getValue()); + paths.add(dst); + return null; + } + }); + } + boolean progress_failed = false; + for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { + Future future = completionService.take(); + future.get(); + if (!progress_failed && reporter != null && !reporter.progress()) { + progress_failed = true; + } + } + + return progress_failed; + } + + /** + * since the splitting process may create multiple output files, we need a map + * regionRecoverStatMap to track the output count of each region. + * @return a map from encoded region ID to the number of edits written out for that region. + */ + @Override + public Map getOutputCounts() { + Map regionRecoverStatMapResult = new HashMap<>(); + for (Map.Entry entry : regionRecoverStatMap.entrySet()) { + regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue()); + } + return regionRecoverStatMapResult; + } + + /** + * @return the number of recovered regions + */ + @Override + public int getNumberOfRecoveredRegions() { + return regionRecoverStatMap.size(); + } + + /** + * Append the buffer to a new recovered edits file, then close it after all done + * @param buffer contain all entries of a certain region + * @throws IOException when closeWriter failed + */ + @Override + public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException { + writeThenClose(buffer); + } + + private Path writeThenClose(WALSplitter.RegionEntryBuffer buffer) throws IOException { + WALSplitter.WriterAndPath wap = appendBuffer(buffer, false); + if (wap != null) { + String encodedRegionName = Bytes.toString(buffer.encodedRegionName); + Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten); + if (value != null) { + Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten; + regionRecoverStatMap.put(encodedRegionName, newValue); + } + } + + Path dst = null; + List thrown = new ArrayList<>(); + if (wap != null) { + dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown); + } + if (!thrown.isEmpty()) { + throw MultipleIOException.createIOException(thrown); + } + return dst; + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java new file mode 100644 index 00000000000..f0974be11d8 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; + +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; +import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; +import org.apache.yetus.audience.InterfaceAudience; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class which accumulates edits and separates them into a buffer per region while simultaneously + * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then + * pull region-specific buffers from this class. + */ +@InterfaceAudience.Private +public class EntryBuffers { + private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class); + + PipelineController controller; + + Map buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR); + + /* + * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick + * up bytes from a region if we're already writing data for that region in a different IO thread. + */ + Set currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR); + + long totalBuffered = 0; + long maxHeapUsage; + boolean splitWriterCreationBounded; + + public EntryBuffers(PipelineController controller, long maxHeapUsage) { + this(controller, maxHeapUsage, false); + } + + public EntryBuffers(PipelineController controller, long maxHeapUsage, + boolean splitWriterCreationBounded) { + this.controller = controller; + this.maxHeapUsage = maxHeapUsage; + this.splitWriterCreationBounded = splitWriterCreationBounded; + } + + /** + * Append a log entry into the corresponding region buffer. Blocks if the total heap usage has + * crossed the specified threshold. + */ + public void appendEntry(WAL.Entry entry) throws InterruptedException, IOException { + WALKey key = entry.getKey(); + RegionEntryBuffer buffer; + long incrHeap; + synchronized (this) { + buffer = buffers.get(key.getEncodedRegionName()); + if (buffer == null) { + buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName()); + buffers.put(key.getEncodedRegionName(), buffer); + } + incrHeap = buffer.appendEntry(entry); + } + + // If we crossed the chunk threshold, wait for more space to be available + synchronized (controller.dataAvailable) { + totalBuffered += incrHeap; + while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { + LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered); + controller.dataAvailable.wait(2000); + } + controller.dataAvailable.notifyAll(); + } + controller.checkForErrors(); + } + + /** + * @return RegionEntryBuffer a buffer of edits to be written. + */ + synchronized RegionEntryBuffer getChunkToWrite() { + // The core part of limiting opening writers is it doesn't return chunk only if the + // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each + // region during splitting. It will flush all the logs in the buffer after splitting + // through a threadpool, which means the number of writers it created is under control. + if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) { + return null; + } + long biggestSize = 0; + byte[] biggestBufferKey = null; + + for (Map.Entry entry : buffers.entrySet()) { + long size = entry.getValue().heapSize(); + if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) { + biggestSize = size; + biggestBufferKey = entry.getKey(); + } + } + if (biggestBufferKey == null) { + return null; + } + + RegionEntryBuffer buffer = buffers.remove(biggestBufferKey); + currentlyWriting.add(biggestBufferKey); + return buffer; + } + + void doneWriting(RegionEntryBuffer buffer) { + synchronized (this) { + boolean removed = currentlyWriting.remove(buffer.encodedRegionName); + assert removed; + } + long size = buffer.heapSize(); + + synchronized (controller.dataAvailable) { + totalBuffered -= size; + // We may unblock writers + controller.dataAvailable.notifyAll(); + } + } + + synchronized boolean isRegionCurrentlyWriting(byte[] region) { + return currentlyWriting.contains(region); + } + + public void waitUntilDrained() { + synchronized (controller.dataAvailable) { + while (totalBuffered > 0) { + try { + controller.dataAvailable.wait(2000); + } catch (InterruptedException e) { + LOG.warn("Got interrupted while waiting for EntryBuffers is drained"); + Thread.interrupted(); + break; + } + } + } + } +} \ No newline at end of file diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java new file mode 100644 index 00000000000..aa649e42b36 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java @@ -0,0 +1,460 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath; +import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.Callable; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.log.HBaseMarkers; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Threads; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; +import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; + +/** + * Class that manages the output streams from the log splitting process. + */ +@InterfaceAudience.Private +public class LogRecoveredEditsOutputSink extends OutputSink { + private static final Logger LOG = LoggerFactory.getLogger(LogRecoveredEditsOutputSink.class); + private WALSplitter walSplitter; + private FileSystem walFS; + private Configuration conf; + + public LogRecoveredEditsOutputSink(WALSplitter walSplitter, + WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { + // More threads could potentially write faster at the expense + // of causing more disk seeks as the logs are split. + // 3. After a certain setting (probably around 3) the + // process will be bound on the reader in the current + // implementation anyway. + super(controller, entryBuffers, numWriters); + this.walSplitter = walSplitter; + this.walFS = walSplitter.walFS; + this.conf = walSplitter.conf; + } + + /** + * @return null if failed to report progress + */ + @Override + public List finishWritingAndClose() throws IOException { + boolean isSuccessful = false; + List result = null; + try { + isSuccessful = finishWriting(false); + } finally { + result = close(); + List thrown = closeLogWriters(null); + if (CollectionUtils.isNotEmpty(thrown)) { + throw MultipleIOException.createIOException(thrown); + } + } + if (isSuccessful) { + splits = result; + } + return splits; + } + + // delete the one with fewer wal entries + private void deleteOneWithFewerEntries(WALSplitter.WriterAndPath wap, Path dst) + throws IOException { + long dstMinLogSeqNum = -1L; + try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) { + WAL.Entry entry = reader.next(); + if (entry != null) { + dstMinLogSeqNum = entry.getKey().getSequenceId(); + } + } catch (EOFException e) { + LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst, + e); + } + if (wap.minLogSeqNum < dstMinLogSeqNum) { + LOG.warn("Found existing old edits file. It could be the result of a previous failed" + + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" + + walFS.getFileStatus(dst).getLen()); + if (!walFS.delete(dst, false)) { + LOG.warn("Failed deleting of old {}", dst); + throw new IOException("Failed deleting of old " + dst); + } + } else { + LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.path + + ", length=" + walFS.getFileStatus(wap.path).getLen()); + if (!walFS.delete(wap.path, false)) { + LOG.warn("Failed deleting of {}", wap.path); + throw new IOException("Failed deleting of " + wap.path); + } + } + } + + /** + * Close all of the output streams. + * @return the list of paths written. + */ + List close() throws IOException { + Preconditions.checkState(!closeAndCleanCompleted); + + final List paths = new ArrayList<>(); + final List thrown = Lists.newArrayList(); + ThreadPoolExecutor closeThreadPool = + Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() { + private int count = 1; + + @Override + public Thread newThread(Runnable r) { + Thread t = new Thread(r, "split-log-closeStream-" + count++); + return t; + } + }); + CompletionService completionService = new ExecutorCompletionService<>(closeThreadPool); + boolean progress_failed; + try { + progress_failed = executeCloseTask(completionService, thrown, paths); + } catch (InterruptedException e) { + IOException iie = new InterruptedIOException(); + iie.initCause(e); + throw iie; + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + closeThreadPool.shutdownNow(); + } + if (!thrown.isEmpty()) { + throw MultipleIOException.createIOException(thrown); + } + writersClosed = true; + closeAndCleanCompleted = true; + if (progress_failed) { + return null; + } + return paths; + } + + /** + * @param completionService threadPool to execute the closing tasks + * @param thrown store the exceptions + * @param paths arrayList to store the paths written + * @return if close tasks executed successful + */ + boolean executeCloseTask(CompletionService completionService, List thrown, + List paths) throws InterruptedException, ExecutionException { + for (final Map.Entry writersEntry : writers.entrySet()) { + if (LOG.isTraceEnabled()) { + LOG.trace( + "Submitting close of " + ((WALSplitter.WriterAndPath) writersEntry.getValue()).path); + } + completionService.submit(new Callable() { + @Override + public Void call() throws Exception { + WALSplitter.WriterAndPath wap = (WALSplitter.WriterAndPath) writersEntry.getValue(); + Path dst = closeWriter(writersEntry.getKey(), wap, thrown); + paths.add(dst); + return null; + } + }); + } + boolean progress_failed = false; + for (int i = 0, n = this.writers.size(); i < n; i++) { + Future future = completionService.take(); + future.get(); + if (!progress_failed && reporter != null && !reporter.progress()) { + progress_failed = true; + } + } + return progress_failed; + } + + Path closeWriter(String encodedRegionName, WALSplitter.WriterAndPath wap, + List thrown) throws IOException { + LOG.trace("Closing {}", wap.path); + try { + wap.writer.close(); + } catch (IOException ioe) { + LOG.error("Could not close log at {}", wap.path, ioe); + thrown.add(ioe); + return null; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Closed wap " + wap.path + " (wrote " + wap.editsWritten + " edits, skipped " + + wap.editsSkipped + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms"); + } + if (wap.editsWritten == 0) { + // just remove the empty recovered.edits file + if (walFS.exists(wap.path) && !walFS.delete(wap.path, false)) { + LOG.warn("Failed deleting empty {}", wap.path); + throw new IOException("Failed deleting empty " + wap.path); + } + return null; + } + + Path dst = getCompletedRecoveredEditsFilePath(wap.path, + regionMaximumEditLogSeqNum.get(encodedRegionName)); + try { + if (!dst.equals(wap.path) && walFS.exists(dst)) { + deleteOneWithFewerEntries(wap, dst); + } + // Skip the unit tests which create a splitter that reads and + // writes the data without touching disk. + // TestHLogSplit#testThreading is an example. + if (walFS.exists(wap.path)) { + if (!walFS.rename(wap.path, dst)) { + throw new IOException("Failed renaming " + wap.path + " to " + dst); + } + LOG.info("Rename {} to {}", wap.path, dst); + } + } catch (IOException ioe) { + LOG.error("Could not rename {} to {}", wap.path, dst, ioe); + thrown.add(ioe); + return null; + } + return dst; + } + + private List closeLogWriters(List thrown) throws IOException { + if (writersClosed) { + return thrown; + } + if (thrown == null) { + thrown = Lists.newArrayList(); + } + try { + for (WriterThread writerThread : writerThreads) { + while (writerThread.isAlive()) { + writerThread.setShouldStop(true); + writerThread.interrupt(); + try { + writerThread.join(10); + } catch (InterruptedException e) { + IOException iie = new InterruptedIOException(); + iie.initCause(e); + throw iie; + } + } + } + } finally { + WALSplitter.WriterAndPath wap = null; + for (WALSplitter.SinkWriter tmpWAP : writers.values()) { + try { + wap = (WALSplitter.WriterAndPath) tmpWAP; + wap.writer.close(); + } catch (IOException ioe) { + LOG.error("Couldn't close log at {}", wap.path, ioe); + thrown.add(ioe); + continue; + } + LOG.info("Closed log " + wap.path + " (wrote " + wap.editsWritten + " edits in " + + (wap.nanosSpent / 1000 / 1000) + "ms)"); + } + writersClosed = true; + } + + return thrown; + } + + /** + * Get a writer and path for a log starting at the given entry. This function is threadsafe so + * long as multiple threads are always acting on different regions. + * @return null if this region shouldn't output any logs + */ + WALSplitter.WriterAndPath getWriterAndPath(WAL.Entry entry, boolean reusable) throws IOException { + byte[] region = entry.getKey().getEncodedRegionName(); + String regionName = Bytes.toString(region); + WALSplitter.WriterAndPath ret = (WALSplitter.WriterAndPath) writers.get(regionName); + if (ret != null) { + return ret; + } + // If we already decided that this region doesn't get any output + // we don't need to check again. + if (blacklistedRegions.contains(region)) { + return null; + } + ret = createWAP(region, entry); + if (ret == null) { + blacklistedRegions.add(region); + return null; + } + if (reusable) { + writers.put(regionName, ret); + } + return ret; + } + + /** + * @return a path with a write for that path. caller should close. + */ + WALSplitter.WriterAndPath createWAP(byte[] region, WAL.Entry entry) throws IOException { + String tmpDirName = walSplitter.conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, + HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); + Path regionedits = getRegionSplitEditsPath(entry, + walSplitter.getFileBeingSplit().getPath().getName(), tmpDirName, conf); + if (regionedits == null) { + return null; + } + FileSystem walFs = FSUtils.getWALFileSystem(conf); + if (walFs.exists(regionedits)) { + LOG.warn("Found old edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + + walFs.getFileStatus(regionedits).getLen()); + if (!walFs.delete(regionedits, false)) { + LOG.warn("Failed delete of old {}", regionedits); + } + } + WALProvider.Writer w = walSplitter.createWriter(regionedits); + LOG.debug("Creating writer path={}", regionedits); + return new WALSplitter.WriterAndPath(regionedits, w, entry.getKey().getSequenceId()); + } + + + + void filterCellByStore(WAL.Entry logEntry) { + Map maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores() + .get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); + if (MapUtils.isEmpty(maxSeqIdInStores)) { + return; + } + // Create the array list for the cells that aren't filtered. + // We make the assumption that most cells will be kept. + ArrayList keptCells = new ArrayList<>(logEntry.getEdit().getCells().size()); + for (Cell cell : logEntry.getEdit().getCells()) { + if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { + keptCells.add(cell); + } else { + byte[] family = CellUtil.cloneFamily(cell); + Long maxSeqId = maxSeqIdInStores.get(family); + // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, + // or the master was crashed before and we can not get the information. + if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) { + keptCells.add(cell); + } + } + } + + // Anything in the keptCells array list is still live. + // So rather than removing the cells from the array list + // which would be an O(n^2) operation, we just replace the list + logEntry.getEdit().setCells(keptCells); + } + + @Override + public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException { + appendBuffer(buffer, true); + } + + WALSplitter.WriterAndPath appendBuffer(WALSplitter.RegionEntryBuffer buffer, boolean reusable) + throws IOException { + List entries = buffer.entryBuffer; + if (entries.isEmpty()) { + LOG.warn("got an empty buffer, skipping"); + return null; + } + + WALSplitter.WriterAndPath wap = null; + + long startTime = System.nanoTime(); + try { + int editsCount = 0; + + for (WAL.Entry logEntry : entries) { + if (wap == null) { + wap = getWriterAndPath(logEntry, reusable); + if (wap == null) { + // This log spews the full edit. Can be massive in the log. Enable only debugging + // WAL lost edit issues. + LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry); + return null; + } + } + filterCellByStore(logEntry); + if (!logEntry.getEdit().isEmpty()) { + wap.writer.append(logEntry); + this.updateRegionMaximumEditLogSeqNum(logEntry); + editsCount++; + } else { + wap.incrementSkippedEdits(1); + } + } + // Pass along summary statistics + wap.incrementEdits(editsCount); + wap.incrementNanoTime(System.nanoTime() - startTime); + } catch (IOException e) { + e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; + LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e); + throw e; + } + return wap; + } + + @Override + public boolean keepRegionEvent(WAL.Entry entry) { + ArrayList cells = entry.getEdit().getCells(); + for (Cell cell : cells) { + if (WALEdit.isCompactionMarker(cell)) { + return true; + } + } + return false; + } + + /** + * @return a map from encoded region ID to the number of edits written out for that region. + */ + @Override + public Map getOutputCounts() { + TreeMap ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (Map.Entry entry : writers.entrySet()) { + ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten); + } + return ret; + } + + @Override + public int getNumberOfRecoveredRegions() { + return writers.size(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java new file mode 100644 index 00000000000..729ea8b3e62 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +/** + * The following class is an abstraction class to provide a common interface to support different + * ways of consuming recovered edits. + */ +@InterfaceAudience.Private +public abstract class OutputSink { + private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class); + + protected WALSplitter.PipelineController controller; + protected EntryBuffers entryBuffers; + + protected ConcurrentHashMap writers = new ConcurrentHashMap<>(); + protected final ConcurrentHashMap regionMaximumEditLogSeqNum = + new ConcurrentHashMap<>(); + + protected final List writerThreads = Lists.newArrayList(); + + /* Set of regions which we've decided should not output edits */ + protected final Set blacklistedRegions = + Collections.synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR)); + + protected boolean closeAndCleanCompleted = false; + + protected boolean writersClosed = false; + + protected final int numThreads; + + protected CancelableProgressable reporter = null; + + protected AtomicLong skippedEdits = new AtomicLong(); + + protected List splits = null; + + public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, + int numWriters) { + numThreads = numWriters; + this.controller = controller; + this.entryBuffers = entryBuffers; + } + + void setReporter(CancelableProgressable reporter) { + this.reporter = reporter; + } + + /** + * Start the threads that will pump data from the entryBuffers to the output files. + */ + public synchronized void startWriterThreads() { + for (int i = 0; i < numThreads; i++) { + WriterThread t = new WriterThread(controller, entryBuffers, this, i); + t.start(); + writerThreads.add(t); + } + } + + /** + * Update region's maximum edit log SeqNum. + */ + void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) { + synchronized (regionMaximumEditLogSeqNum) { + String regionName = Bytes.toString(entry.getKey().getEncodedRegionName()); + Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName); + if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) { + regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId()); + } + } + } + + /** + * @return the number of currently opened writers + */ + int getNumOpenWriters() { + return this.writers.size(); + } + + long getSkippedEdits() { + return this.skippedEdits.get(); + } + + /** + * Wait for writer threads to dump all info to the sink + * @return true when there is no error + */ + protected boolean finishWriting(boolean interrupt) throws IOException { + LOG.debug("Waiting for split writer threads to finish"); + boolean progress_failed = false; + for (WriterThread t : writerThreads) { + t.finish(); + } + if (interrupt) { + for (WriterThread t : writerThreads) { + t.interrupt(); // interrupt the writer threads. We are stopping now. + } + } + + for (WriterThread t : writerThreads) { + if (!progress_failed && reporter != null && !reporter.progress()) { + progress_failed = true; + } + try { + t.join(); + } catch (InterruptedException ie) { + IOException iie = new InterruptedIOException(); + iie.initCause(ie); + throw iie; + } + } + controller.checkForErrors(); + LOG.info("{} split writers finished; closing.", this.writerThreads.size()); + return (!progress_failed); + } + + public abstract List finishWritingAndClose() throws IOException; + + /** + * @return a map from encoded region ID to the number of edits written out for that region. + */ + public abstract Map getOutputCounts(); + + /** + * @return number of regions we've recovered + */ + public abstract int getNumberOfRecoveredRegions(); + + /** + * @param buffer A WAL Edit Entry + */ + public abstract void append(WALSplitter.RegionEntryBuffer buffer) throws IOException; + + /** + * WriterThread call this function to help flush internal remaining edits in buffer before close + * @return true when underlying sink has something to flush + */ + public boolean flush() throws IOException { + return false; + } + + /** + * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will + * want to get all of those edits. + * @return Return true if this sink wants to accept this region-level WALEdit. + */ + public abstract boolean keepRegionEvent(WAL.Entry entry); + + public static class WriterThread extends Thread { + private volatile boolean shouldStop = false; + private WALSplitter.PipelineController controller; + private EntryBuffers entryBuffers; + private OutputSink outputSink = null; + + WriterThread(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, + OutputSink sink, int i) { + super(Thread.currentThread().getName() + "-Writer-" + i); + this.controller = controller; + this.entryBuffers = entryBuffers; + outputSink = sink; + } + + @Override + public void run() { + try { + doRun(); + } catch (Throwable t) { + LOG.error("Exiting thread", t); + controller.writerThreadError(t); + } + } + + private void doRun() throws IOException { + LOG.trace("Writer thread starting"); + while (true) { + WALSplitter.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); + if (buffer == null) { + // No data currently available, wait on some more to show up + synchronized (controller.dataAvailable) { + if (shouldStop && !this.outputSink.flush()) { + return; + } + try { + controller.dataAvailable.wait(500); + } catch (InterruptedException ie) { + if (!shouldStop) { + throw new RuntimeException(ie); + } + } + } + continue; + } + + assert buffer != null; + try { + writeBuffer(buffer); + } finally { + entryBuffers.doneWriting(buffer); + } + } + } + + private void writeBuffer(WALSplitter.RegionEntryBuffer buffer) throws IOException { + outputSink.append(buffer); + } + + void setShouldStop(boolean shouldStop) { + this.shouldStop = shouldStop; + } + + void finish() { + synchronized (controller.dataAvailable) { + shouldStop = true; + controller.dataAvailable.notifyAll(); + } + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java new file mode 100644 index 00000000000..d518f2e5c31 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java @@ -0,0 +1,523 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.NavigableSet; +import java.util.TreeSet; +import java.util.UUID; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.ArrayUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileAlreadyExistsException; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellScanner; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Durability; +import org.apache.hadoop.hbase.client.Mutation; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.RegionInfo; +import org.apache.hadoop.hbase.regionserver.HRegion; +import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.FSUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; + +/** + * This class provides static methods to support WAL splitting related works + */ +@InterfaceAudience.Private +public final class WALSplitUtil { + private static final Logger LOG = LoggerFactory.getLogger(WALSplitUtil.class); + + private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); + private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; + private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid"; + private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid"; + private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length(); + + private WALSplitUtil() { + } + + /** + * Completes the work done by splitLogFile by archiving logs + *

+ * It is invoked by SplitLogManager once it knows that one of the SplitLogWorkers have completed + * the splitLogFile() part. If the master crashes then this function might get called multiple + * times. + *

+ * @param logfile + * @param conf + * @throws IOException + */ + public static void finishSplitLogFile(String logfile, Configuration conf) throws IOException { + Path walDir = FSUtils.getWALRootDir(conf); + Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME); + Path walPath; + if (FSUtils.isStartingWithPath(walDir, logfile)) { + walPath = new Path(logfile); + } else { + walPath = new Path(walDir, logfile); + } + finishSplitLogFile(walDir, oldLogDir, walPath, conf); + } + + static void finishSplitLogFile(Path walDir, Path oldWALDir, Path walPath, + Configuration conf) throws IOException { + List processedLogs = new ArrayList<>(); + List corruptedLogs = new ArrayList<>(); + FileSystem walFS = walDir.getFileSystem(conf); + if (ZKSplitLog.isCorrupted(walDir, walPath.getName(), walFS)) { + corruptedLogs.add(walPath); + } else { + processedLogs.add(walPath); + } + archiveWALs(corruptedLogs, processedLogs, oldWALDir, walFS, conf); + Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, walPath.getName()); + walFS.delete(stagingDir, true); + } + + /** + * Moves processed logs to a oldLogDir after successful processing Moves corrupted logs (any log + * that couldn't be successfully parsed to corruptDir (.corrupt) for later investigation + */ + private static void archiveWALs(final List corruptedWALs, final List processedWALs, + final Path oldWALDir, final FileSystem walFS, final Configuration conf) throws IOException { + final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); + if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { + LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", + corruptDir); + } + if (!walFS.mkdirs(corruptDir)) { + LOG.info("Unable to mkdir {}", corruptDir); + } + walFS.mkdirs(oldWALDir); + + // this method can get restarted or called multiple times for archiving + // the same log files. + for (Path corruptedWAL : corruptedWALs) { + Path p = new Path(corruptDir, corruptedWAL.getName()); + if (walFS.exists(corruptedWAL)) { + if (!walFS.rename(corruptedWAL, p)) { + LOG.warn("Unable to move corrupted log {} to {}", corruptedWAL, p); + } else { + LOG.warn("Moved corrupted log {} to {}", corruptedWAL, p); + } + } + } + + for (Path p : processedWALs) { + Path newPath = AbstractFSWAL.getWALArchivePath(oldWALDir, p); + if (walFS.exists(p)) { + if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) { + LOG.warn("Unable to move {} to {}", p, newPath); + } else { + LOG.info("Archived processed log {} to {}", p, newPath); + } + } + } + } + + /** + * Path to a file under RECOVERED_EDITS_DIR directory of the region found in logEntry + * named for the sequenceid in the passed logEntry: e.g. + * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of + * RECOVERED_EDITS_DIR under the region creating it if necessary. + * @param walEntry walEntry to recover + * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name. + * @param tmpDirName of the directory used to sideline old recovered edits file + * @param conf configuration + * @return Path to file into which to dump split log edits. + * @throws IOException + */ + @SuppressWarnings("deprecation") + @VisibleForTesting + static Path getRegionSplitEditsPath(final WAL.Entry walEntry, String fileNameBeingSplit, + String tmpDirName, Configuration conf) throws IOException { + FileSystem walFS = FSUtils.getWALFileSystem(conf); + Path tableDir = FSUtils.getWALTableDir(conf, walEntry.getKey().getTableName()); + String encodedRegionName = Bytes.toString(walEntry.getKey().getEncodedRegionName()); + Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName); + Path dir = getRegionDirRecoveredEditsDir(regionDir); + + if (walFS.exists(dir) && walFS.isFile(dir)) { + Path tmp = new Path(tmpDirName); + if (!walFS.exists(tmp)) { + walFS.mkdirs(tmp); + } + tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName); + LOG.warn("Found existing old file: {}. It could be some " + + "leftover of an old installation. It should be a folder instead. " + + "So moving it to {}", + dir, tmp); + if (!walFS.rename(dir, tmp)) { + LOG.warn("Failed to sideline old file {}", dir); + } + } + + if (!walFS.exists(dir) && !walFS.mkdirs(dir)) { + LOG.warn("mkdir failed on {}", dir); + } + // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. + // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure + // region's replayRecoveredEdits will not delete it + String fileName = formatRecoveredEditsFileName(walEntry.getKey().getSequenceId()); + fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit); + return new Path(dir, fileName); + } + + private static String getTmpRecoveredEditsFileName(String fileName) { + return fileName + RECOVERED_LOG_TMPFILE_SUFFIX; + } + + /** + * Get the completed recovered edits file path, renaming it to be by last edit in the file from + * its first edit. Then we could use the name to skip recovered edits when doing + * {@link HRegion#replayRecoveredEditsIfAny}. + * @return dstPath take file's last edit log seq num as the name + */ + static Path getCompletedRecoveredEditsFilePath(Path srcPath, long maximumEditWALSeqNum) { + String fileName = formatRecoveredEditsFileName(maximumEditWALSeqNum); + return new Path(srcPath.getParent(), fileName); + } + + @VisibleForTesting + static String formatRecoveredEditsFileName(final long seqid) { + return String.format("%019d", seqid); + } + + /** + * @param regionDir This regions directory in the filesystem. + * @return The directory that holds recovered edits files for the region regionDir + */ + public static Path getRegionDirRecoveredEditsDir(final Path regionDir) { + return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR); + } + + /** + * Check whether there is recovered.edits in the region dir + * @param conf conf + * @param regionInfo the region to check + * @throws IOException IOException + * @return true if recovered.edits exist in the region dir + */ + public static boolean hasRecoveredEdits(final Configuration conf, final RegionInfo regionInfo) + throws IOException { + // No recovered.edits for non default replica regions + if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { + return false; + } + // Only default replica region can reach here, so we can use regioninfo + // directly without converting it to default replica's regioninfo. + Path regionDir = + FSUtils.getWALRegionDir(conf, regionInfo.getTable(), regionInfo.getEncodedName()); + NavigableSet files = getSplitEditFilesSorted(FSUtils.getWALFileSystem(conf), regionDir); + return files != null && !files.isEmpty(); + } + + /** + * Returns sorted set of edit files made by splitter, excluding files with '.temp' suffix. + * @param walFS WAL FileSystem used to retrieving split edits files. + * @param regionDir WAL region dir to look for recovered edits files under. + * @return Files in passed regionDir as a sorted set. + * @throws IOException + */ + public static NavigableSet getSplitEditFilesSorted(final FileSystem walFS, + final Path regionDir) throws IOException { + NavigableSet filesSorted = new TreeSet<>(); + Path editsdir = getRegionDirRecoveredEditsDir(regionDir); + if (!walFS.exists(editsdir)) { + return filesSorted; + } + FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() { + @Override + public boolean accept(Path p) { + boolean result = false; + try { + // Return files and only files that match the editfile names pattern. + // There can be other files in this directory other than edit files. + // In particular, on error, we'll move aside the bad edit file giving + // it a timestamp suffix. See moveAsideBadEditsFile. + Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); + result = walFS.isFile(p) && m.matches(); + // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, + // because it means splitwal thread is writting this file. + if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { + result = false; + } + // Skip SeqId Files + if (isSequenceIdFile(p)) { + result = false; + } + } catch (IOException e) { + LOG.warn("Failed isFile check on {}", p, e); + } + return result; + } + }); + if (ArrayUtils.isNotEmpty(files)) { + Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath())); + } + return filesSorted; + } + + /** + * Move aside a bad edits file. + * @param walFS WAL FileSystem used to rename bad edits file. + * @param edits Edits file to move aside. + * @return The name of the moved aside file. + * @throws IOException + */ + public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits) + throws IOException { + Path moveAsideName = + new Path(edits.getParent(), edits.getName() + "." + System.currentTimeMillis()); + if (!walFS.rename(edits, moveAsideName)) { + LOG.warn("Rename failed from {} to {}", edits, moveAsideName); + } + return moveAsideName; + } + + /** + * Is the given file a region open sequence id file. + */ + @VisibleForTesting + public static boolean isSequenceIdFile(final Path file) { + return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX) + || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX); + } + + private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir) + throws IOException { + // TODO: Why are we using a method in here as part of our normal region open where + // there is no splitting involved? Fix. St.Ack 01/20/2017. + Path editsDir = getRegionDirRecoveredEditsDir(regionDir); + try { + FileStatus[] files = walFS.listStatus(editsDir, WALSplitUtil::isSequenceIdFile); + return files != null ? files : new FileStatus[0]; + } catch (FileNotFoundException e) { + return new FileStatus[0]; + } + } + + private static long getMaxSequenceId(FileStatus[] files) { + long maxSeqId = -1L; + for (FileStatus file : files) { + String fileName = file.getPath().getName(); + try { + maxSeqId = Math.max(maxSeqId, Long + .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH))); + } catch (NumberFormatException ex) { + LOG.warn("Invalid SeqId File Name={}", fileName); + } + } + return maxSeqId; + } + + /** + * Get the max sequence id which is stored in the region directory. -1 if none. + */ + public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException { + return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir)); + } + + /** + * Create a file with name as region's max sequence id + */ + public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId) + throws IOException { + FileStatus[] files = getSequenceIdFiles(walFS, regionDir); + long maxSeqId = getMaxSequenceId(files); + if (maxSeqId > newMaxSeqId) { + throw new IOException("The new max sequence id " + newMaxSeqId + + " is less than the old max sequence id " + maxSeqId); + } + // write a new seqId file + Path newSeqIdFile = + new Path(getRegionDirRecoveredEditsDir(regionDir), newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX); + if (newMaxSeqId != maxSeqId) { + try { + if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) { + throw new IOException("Failed to create SeqId file:" + newSeqIdFile); + } + LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId, + maxSeqId); + } catch (FileAlreadyExistsException ignored) { + // latest hdfs throws this exception. it's all right if newSeqIdFile already exists + } + } + // remove old ones + for (FileStatus status : files) { + if (!newSeqIdFile.equals(status.getPath())) { + walFS.delete(status.getPath(), false); + } + } + } + + /** A struct used by getMutationsFromWALEntry */ + public static class MutationReplay implements Comparable { + public MutationReplay(ClientProtos.MutationProto.MutationType type, Mutation mutation, + long nonceGroup, long nonce) { + this.type = type; + this.mutation = mutation; + if (this.mutation.getDurability() != Durability.SKIP_WAL) { + // using ASYNC_WAL for relay + this.mutation.setDurability(Durability.ASYNC_WAL); + } + this.nonceGroup = nonceGroup; + this.nonce = nonce; + } + + private final ClientProtos.MutationProto.MutationType type; + public final Mutation mutation; + public final long nonceGroup; + public final long nonce; + + @Override + public int compareTo(final MutationReplay d) { + return this.mutation.compareTo(d.mutation); + } + + @Override + public boolean equals(Object obj) { + if (!(obj instanceof MutationReplay)) { + return false; + } else { + return this.compareTo((MutationReplay) obj) == 0; + } + } + + @Override + public int hashCode() { + return this.mutation.hashCode(); + } + + public ClientProtos.MutationProto.MutationType getType() { + return type; + } + } + + /** + * This function is used to construct mutations from a WALEntry. It also reconstructs WALKey & + * WALEdit from the passed in WALEntry + * @param entry + * @param cells + * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances + * extracted from the passed in WALEntry. + * @return list of Pair<MutationType, Mutation> to be replayed + * @throws IOException + */ + public static List getMutationsFromWALEntry(AdminProtos.WALEntry entry, + CellScanner cells, Pair logEntry, Durability durability) throws IOException { + if (entry == null) { + // return an empty array + return Collections.emptyList(); + } + + long replaySeqId = + (entry.getKey().hasOrigSequenceNumber()) ? entry.getKey().getOrigSequenceNumber() + : entry.getKey().getLogSequenceNumber(); + int count = entry.getAssociatedCellCount(); + List mutations = new ArrayList<>(); + Cell previousCell = null; + Mutation m = null; + WALKeyImpl key = null; + WALEdit val = null; + if (logEntry != null) { + val = new WALEdit(); + } + + for (int i = 0; i < count; i++) { + // Throw index out of bounds if our cell count is off + if (!cells.advance()) { + throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); + } + Cell cell = cells.current(); + if (val != null) val.add(cell); + + boolean isNewRowOrType = + previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() + || !CellUtil.matchingRows(previousCell, cell); + if (isNewRowOrType) { + // Create new mutation + if (CellUtil.isDelete(cell)) { + m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + // Deletes don't have nonces. + mutations.add(new MutationReplay(ClientProtos.MutationProto.MutationType.DELETE, m, + HConstants.NO_NONCE, HConstants.NO_NONCE)); + } else { + m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); + // Puts might come from increment or append, thus we need nonces. + long nonceGroup = + entry.getKey().hasNonceGroup() ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; + long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; + mutations.add( + new MutationReplay(ClientProtos.MutationProto.MutationType.PUT, m, nonceGroup, nonce)); + } + } + if (CellUtil.isDelete(cell)) { + ((Delete) m).add(cell); + } else { + ((Put) m).add(cell); + } + m.setDurability(durability); + previousCell = cell; + } + + // reconstruct WALKey + if (logEntry != null) { + org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto = + entry.getKey(); + List clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount()); + for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) { + clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); + } + key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), + TableName.valueOf(walKeyProto.getTableName().toByteArray()), replaySeqId, + walKeyProto.getWriteTime(), clusterIds, walKeyProto.getNonceGroup(), + walKeyProto.getNonce(), null); + logEntry.setFirst(key); + logEntry.setSecond(val); + } + + return mutations; + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index c436db2b7a4..300fbf63c6d 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -17,74 +17,43 @@ */ package org.apache.hadoop.hbase.wal; +import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile; + import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InterruptedIOException; import java.text.ParseException; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.NavigableSet; -import java.util.Set; import java.util.TreeMap; -import java.util.TreeSet; -import java.util.UUID; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; -import java.util.regex.Matcher; -import java.util.regex.Pattern; import org.apache.commons.lang3.ArrayUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.PathFilter; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellScanner; -import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Delete; -import org.apache.hadoop.hbase.client.Durability; -import org.apache.hadoop.hbase.client.Mutation; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.RegionInfo; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.io.HeapSize; -import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; -import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.LastSequenceId; -import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.Pair; -import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; -import org.apache.hadoop.io.MultipleIOException; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; @@ -92,16 +61,10 @@ import org.slf4j.LoggerFactory; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; -import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; -import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; /** * This class is responsible for splitting up a bunch of regionserver commit log * files that are no longer being written to, into new files, one per region, for @@ -148,12 +111,11 @@ public class WALSplitter { @VisibleForTesting - WALSplitter(final WALFactory factory, Configuration conf, Path walDir, - FileSystem walFS, LastSequenceId idChecker, - SplitLogWorkerCoordination splitLogWorkerCoordination) { + WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS, + LastSequenceId idChecker, SplitLogWorkerCoordination splitLogWorkerCoordination) { this.conf = HBaseConfiguration.create(conf); - String codecClassName = conf - .get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); + String codecClassName = + conf.get(WALCellCodec.WAL_CELL_CODEC_CLASS_KEY, WALCellCodec.class.getName()); this.conf.set(HConstants.RPC_CODEC_CONF_KEY, codecClassName); this.walDir = walDir; this.walFS = walFS; @@ -170,14 +132,27 @@ public class WALSplitter { splitWriterCreationBounded); int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); - if(splitWriterCreationBounded){ - outputSink = new BoundedLogWriterCreationOutputSink( - controller, entryBuffers, numWriterThreads); - }else { - outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); + if (splitWriterCreationBounded) { + outputSink = + new BoundedLogWriterCreationOutputSink(this, controller, entryBuffers, numWriterThreads); + } else { + outputSink = + new LogRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads); } } + WALFactory getWalFactory(){ + return this.walFactory; + } + + FileStatus getFileBeingSplit() { + return fileBeingSplit; + } + + Map> getRegionMaxSeqIdInStores() { + return regionMaxSeqIdInStores; + } + /** * Splits a WAL file into region's recovered-edits directory. * This is the main entry point for distributed log splitting from SplitLogWorker. @@ -360,358 +335,8 @@ public class WALSplitter { return !progress_failed; } - /** - * Completes the work done by splitLogFile by archiving logs - *

- * It is invoked by SplitLogManager once it knows that one of the - * SplitLogWorkers have completed the splitLogFile() part. If the master - * crashes then this function might get called multiple times. - *

- * @param logfile - * @param conf - * @throws IOException - */ - public static void finishSplitLogFile(String logfile, - Configuration conf) throws IOException { - Path walDir = FSUtils.getWALRootDir(conf); - Path oldLogDir = new Path(walDir, HConstants.HREGION_OLDLOGDIR_NAME); - Path logPath; - if (FSUtils.isStartingWithPath(walDir, logfile)) { - logPath = new Path(logfile); - } else { - logPath = new Path(walDir, logfile); - } - finishSplitLogFile(walDir, oldLogDir, logPath, conf); - } - - private static void finishSplitLogFile(Path walDir, Path oldLogDir, - Path logPath, Configuration conf) throws IOException { - List processedLogs = new ArrayList<>(); - List corruptedLogs = new ArrayList<>(); - FileSystem walFS = walDir.getFileSystem(conf); - if (ZKSplitLog.isCorrupted(walDir, logPath.getName(), walFS)) { - corruptedLogs.add(logPath); - } else { - processedLogs.add(logPath); - } - archiveLogs(corruptedLogs, processedLogs, oldLogDir, walFS, conf); - Path stagingDir = ZKSplitLog.getSplitLogDir(walDir, logPath.getName()); - walFS.delete(stagingDir, true); - } - - /** - * Moves processed logs to a oldLogDir after successful processing Moves - * corrupted logs (any log that couldn't be successfully parsed to corruptDir - * (.corrupt) for later investigation - * - * @param corruptedLogs - * @param processedLogs - * @param oldLogDir - * @param walFS WAL FileSystem to archive files on. - * @param conf - * @throws IOException - */ - private static void archiveLogs( - final List corruptedLogs, - final List processedLogs, final Path oldLogDir, - final FileSystem walFS, final Configuration conf) throws IOException { - final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); - if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { - LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}", - corruptDir); - } - if (!walFS.mkdirs(corruptDir)) { - LOG.info("Unable to mkdir {}", corruptDir); - } - walFS.mkdirs(oldLogDir); - - // this method can get restarted or called multiple times for archiving - // the same log files. - for (Path corrupted : corruptedLogs) { - Path p = new Path(corruptDir, corrupted.getName()); - if (walFS.exists(corrupted)) { - if (!walFS.rename(corrupted, p)) { - LOG.warn("Unable to move corrupted log {} to {}", corrupted, p); - } else { - LOG.warn("Moved corrupted log {} to {}", corrupted, p); - } - } - } - - for (Path p : processedLogs) { - Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p); - if (walFS.exists(p)) { - if (!FSUtils.renameAndSetModifyTime(walFS, p, newPath)) { - LOG.warn("Unable to move {} to {}", p, newPath); - } else { - LOG.info("Archived processed log {} to {}", p, newPath); - } - } - } - } - - /** - * Path to a file under RECOVERED_EDITS_DIR directory of the region found in - * logEntry named for the sequenceid in the passed - * logEntry: e.g. /hbase/some_table/2323432434/recovered.edits/2332. - * This method also ensures existence of RECOVERED_EDITS_DIR under the region - * creating it if necessary. - * @param logEntry - * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name. - * @param tmpDirName of the directory used to sideline old recovered edits file - * @param conf - * @return Path to file into which to dump split log edits. - * @throws IOException - */ - @SuppressWarnings("deprecation") - @VisibleForTesting - static Path getRegionSplitEditsPath(final Entry logEntry, String fileNameBeingSplit, - String tmpDirName, Configuration conf) throws IOException { - FileSystem walFS = FSUtils.getWALFileSystem(conf); - Path tableDir = FSUtils.getWALTableDir(conf, logEntry.getKey().getTableName()); - String encodedRegionName = Bytes.toString(logEntry.getKey().getEncodedRegionName()); - Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName); - Path dir = getRegionDirRecoveredEditsDir(regionDir); - - - if (walFS.exists(dir) && walFS.isFile(dir)) { - Path tmp = new Path(tmpDirName); - if (!walFS.exists(tmp)) { - walFS.mkdirs(tmp); - } - tmp = new Path(tmp, - HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName); - LOG.warn("Found existing old file: {}. It could be some " - + "leftover of an old installation. It should be a folder instead. " - + "So moving it to {}", dir, tmp); - if (!walFS.rename(dir, tmp)) { - LOG.warn("Failed to sideline old file {}", dir); - } - } - - if (!walFS.exists(dir) && !walFS.mkdirs(dir)) { - LOG.warn("mkdir failed on {}", dir); - } - // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. - // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure - // region's replayRecoveredEdits will not delete it - String fileName = formatRecoveredEditsFileName(logEntry.getKey().getSequenceId()); - fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit); - return new Path(dir, fileName); - } - - private static String getTmpRecoveredEditsFileName(String fileName) { - return fileName + RECOVERED_LOG_TMPFILE_SUFFIX; - } - - /** - * Get the completed recovered edits file path, renaming it to be by last edit - * in the file from its first edit. Then we could use the name to skip - * recovered edits when doing {@link HRegion#replayRecoveredEditsIfAny}. - * @param srcPath - * @param maximumEditLogSeqNum - * @return dstPath take file's last edit log seq num as the name - */ - private static Path getCompletedRecoveredEditsFilePath(Path srcPath, - long maximumEditLogSeqNum) { - String fileName = formatRecoveredEditsFileName(maximumEditLogSeqNum); - return new Path(srcPath.getParent(), fileName); - } - - @VisibleForTesting - static String formatRecoveredEditsFileName(final long seqid) { - return String.format("%019d", seqid); - } - - private static final Pattern EDITFILES_NAME_PATTERN = Pattern.compile("-?[0-9]+"); - private static final String RECOVERED_LOG_TMPFILE_SUFFIX = ".temp"; - - /** - * @param regionDir - * This regions directory in the filesystem. - * @return The directory that holds recovered edits files for the region - * regionDir - */ - public static Path getRegionDirRecoveredEditsDir(final Path regionDir) { - return new Path(regionDir, HConstants.RECOVERED_EDITS_DIR); - } - - /** - * Check whether there is recovered.edits in the region dir - * @param conf conf - * @param regionInfo the region to check - * @throws IOException IOException - * @return true if recovered.edits exist in the region dir - */ - public static boolean hasRecoveredEdits(final Configuration conf, - final RegionInfo regionInfo) throws IOException { - // No recovered.edits for non default replica regions - if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) { - return false; - } - //Only default replica region can reach here, so we can use regioninfo - //directly without converting it to default replica's regioninfo. - Path regionDir = FSUtils.getWALRegionDir(conf, regionInfo.getTable(), - regionInfo.getEncodedName()); - NavigableSet files = getSplitEditFilesSorted(FSUtils.getWALFileSystem(conf), regionDir); - return files != null && !files.isEmpty(); - } - - - /** - * Returns sorted set of edit files made by splitter, excluding files - * with '.temp' suffix. - * - * @param walFS WAL FileSystem used to retrieving split edits files. - * @param regionDir WAL region dir to look for recovered edits files under. - * @return Files in passed regionDir as a sorted set. - * @throws IOException - */ - public static NavigableSet getSplitEditFilesSorted(final FileSystem walFS, - final Path regionDir) throws IOException { - NavigableSet filesSorted = new TreeSet<>(); - Path editsdir = getRegionDirRecoveredEditsDir(regionDir); - if (!walFS.exists(editsdir)) { - return filesSorted; - } - FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() { - @Override - public boolean accept(Path p) { - boolean result = false; - try { - // Return files and only files that match the editfile names pattern. - // There can be other files in this directory other than edit files. - // In particular, on error, we'll move aside the bad edit file giving - // it a timestamp suffix. See moveAsideBadEditsFile. - Matcher m = EDITFILES_NAME_PATTERN.matcher(p.getName()); - result = walFS.isFile(p) && m.matches(); - // Skip the file whose name ends with RECOVERED_LOG_TMPFILE_SUFFIX, - // because it means splitwal thread is writting this file. - if (p.getName().endsWith(RECOVERED_LOG_TMPFILE_SUFFIX)) { - result = false; - } - // Skip SeqId Files - if (isSequenceIdFile(p)) { - result = false; - } - } catch (IOException e) { - LOG.warn("Failed isFile check on {}", p, e); - } - return result; - } - }); - if (ArrayUtils.isNotEmpty(files)) { - Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath())); - } - return filesSorted; - } - - /** - * Move aside a bad edits file. - * - * @param walFS WAL FileSystem used to rename bad edits file. - * @param edits - * Edits file to move aside. - * @return The name of the moved aside file. - * @throws IOException - */ - public static Path moveAsideBadEditsFile(final FileSystem walFS, final Path edits) - throws IOException { - Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." - + System.currentTimeMillis()); - if (!walFS.rename(edits, moveAsideName)) { - LOG.warn("Rename failed from {} to {}", edits, moveAsideName); - } - return moveAsideName; - } - - private static final String SEQUENCE_ID_FILE_SUFFIX = ".seqid"; - private static final String OLD_SEQUENCE_ID_FILE_SUFFIX = "_seqid"; - private static final int SEQUENCE_ID_FILE_SUFFIX_LENGTH = SEQUENCE_ID_FILE_SUFFIX.length(); - - /** - * Is the given file a region open sequence id file. - */ - @VisibleForTesting - public static boolean isSequenceIdFile(final Path file) { - return file.getName().endsWith(SEQUENCE_ID_FILE_SUFFIX) - || file.getName().endsWith(OLD_SEQUENCE_ID_FILE_SUFFIX); - } - - private static FileStatus[] getSequenceIdFiles(FileSystem walFS, Path regionDir) - throws IOException { - // TODO: Why are we using a method in here as part of our normal region open where - // there is no splitting involved? Fix. St.Ack 01/20/2017. - Path editsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir); - try { - FileStatus[] files = walFS.listStatus(editsDir, WALSplitter::isSequenceIdFile); - return files != null ? files : new FileStatus[0]; - } catch (FileNotFoundException e) { - return new FileStatus[0]; - } - } - - private static long getMaxSequenceId(FileStatus[] files) { - long maxSeqId = -1L; - for (FileStatus file : files) { - String fileName = file.getPath().getName(); - try { - maxSeqId = Math.max(maxSeqId, Long - .parseLong(fileName.substring(0, fileName.length() - SEQUENCE_ID_FILE_SUFFIX_LENGTH))); - } catch (NumberFormatException ex) { - LOG.warn("Invalid SeqId File Name={}", fileName); - } - } - return maxSeqId; - } - - /** - * Get the max sequence id which is stored in the region directory. -1 if none. - */ - public static long getMaxRegionSequenceId(FileSystem walFS, Path regionDir) throws IOException { - return getMaxSequenceId(getSequenceIdFiles(walFS, regionDir)); - } - - /** - * Create a file with name as region's max sequence id - */ - public static void writeRegionSequenceIdFile(FileSystem walFS, Path regionDir, long newMaxSeqId) - throws IOException { - FileStatus[] files = getSequenceIdFiles(walFS, regionDir); - long maxSeqId = getMaxSequenceId(files); - if (maxSeqId > newMaxSeqId) { - throw new IOException("The new max sequence id " + newMaxSeqId + - " is less than the old max sequence id " + maxSeqId); - } - // write a new seqId file - Path newSeqIdFile = new Path(WALSplitter.getRegionDirRecoveredEditsDir(regionDir), - newMaxSeqId + SEQUENCE_ID_FILE_SUFFIX); - if (newMaxSeqId != maxSeqId) { - try { - if (!walFS.createNewFile(newSeqIdFile) && !walFS.exists(newSeqIdFile)) { - throw new IOException("Failed to create SeqId file:" + newSeqIdFile); - } - LOG.debug("Wrote file={}, newMaxSeqId={}, maxSeqId={}", newSeqIdFile, newMaxSeqId, - maxSeqId); - } catch (FileAlreadyExistsException ignored) { - // latest hdfs throws this exception. it's all right if newSeqIdFile already exists - } - } - // remove old ones - for (FileStatus status : files) { - if (!newSeqIdFile.equals(status.getPath())) { - walFS.delete(status.getPath(), false); - } - } - } - /** * Create a new {@link Reader} for reading logs to split. - * - * @param file - * @return A new Reader instance, caller should close - * @throws IOException - * @throws CorruptedLogFileException */ protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter) throws IOException, CorruptedLogFileException { @@ -761,7 +386,7 @@ public class WALSplitter { } static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) - throws CorruptedLogFileException, IOException { + throws CorruptedLogFileException, IOException { try { return in.next(); } catch (EOFException eof) { @@ -771,9 +396,8 @@ public class WALSplitter { } catch (IOException e) { // If the IOE resulted from bad file format, // then this problem is idempotent and retrying won't help - if (e.getCause() != null && - (e.getCause() instanceof ParseException || - e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { + if (e.getCause() != null && (e.getCause() instanceof ParseException + || e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { LOG.warn("Parse exception from wal {}. Continuing", path, e); return null; } @@ -781,19 +405,18 @@ public class WALSplitter { throw e; } CorruptedLogFileException t = - new CorruptedLogFileException("skipErrors=true Ignoring exception" + - " while parsing wal " + path + ". Marking as corrupted"); + new CorruptedLogFileException("skipErrors=true Ignoring exception" + " while parsing wal " + + path + ". Marking as corrupted"); t.initCause(e); throw t; } } /** - * Create a new {@link Writer} for writing log splits. + * Create a new {@link WALProvider.Writer} for writing log splits. * @return a new Writer instance, caller should close */ - protected Writer createWriter(Path logfile) - throws IOException { + protected WALProvider.Writer createWriter(Path logfile) throws IOException { return walFactory.createRecoveredEditsWriter(walFS, logfile); } @@ -826,7 +449,7 @@ public class WALSplitter { // Wait/notify for when data has been produced by the writer thread, // consumed by the reader thread, or an exception occurred - public final Object dataAvailable = new Object(); + final Object dataAvailable = new Object(); void writerThreadError(Throwable t) { thrown.compareAndSet(null, t); @@ -837,7 +460,9 @@ public class WALSplitter { */ void checkForErrors() throws IOException { Throwable thrown = this.thrown.get(); - if (thrown == null) return; + if (thrown == null) { + return; + } if (thrown instanceof IOException) { throw new IOException(thrown); } else { @@ -846,134 +471,6 @@ public class WALSplitter { } } - /** - * Class which accumulates edits and separates them into a buffer per region - * while simultaneously accounting RAM usage. Blocks if the RAM usage crosses - * a predefined threshold. - * - * Writer threads then pull region-specific buffers from this class. - */ - public static class EntryBuffers { - PipelineController controller; - - Map buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR); - - /* Track which regions are currently in the middle of writing. We don't allow - an IO thread to pick up bytes from a region if we're already writing - data for that region in a different IO thread. */ - Set currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR); - - long totalBuffered = 0; - long maxHeapUsage; - boolean splitWriterCreationBounded; - - public EntryBuffers(PipelineController controller, long maxHeapUsage) { - this(controller, maxHeapUsage, false); - } - - public EntryBuffers(PipelineController controller, long maxHeapUsage, - boolean splitWriterCreationBounded){ - this.controller = controller; - this.maxHeapUsage = maxHeapUsage; - this.splitWriterCreationBounded = splitWriterCreationBounded; - } - - /** - * Append a log entry into the corresponding region buffer. - * Blocks if the total heap usage has crossed the specified threshold. - * - * @throws InterruptedException - * @throws IOException - */ - public void appendEntry(Entry entry) throws InterruptedException, IOException { - WALKey key = entry.getKey(); - - RegionEntryBuffer buffer; - long incrHeap; - synchronized (this) { - buffer = buffers.get(key.getEncodedRegionName()); - if (buffer == null) { - buffer = new RegionEntryBuffer(key.getTableName(), key.getEncodedRegionName()); - buffers.put(key.getEncodedRegionName(), buffer); - } - incrHeap= buffer.appendEntry(entry); - } - - // If we crossed the chunk threshold, wait for more space to be available - synchronized (controller.dataAvailable) { - totalBuffered += incrHeap; - while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { - LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered); - controller.dataAvailable.wait(2000); - } - controller.dataAvailable.notifyAll(); - } - controller.checkForErrors(); - } - - /** - * @return RegionEntryBuffer a buffer of edits to be written. - */ - synchronized RegionEntryBuffer getChunkToWrite() { - // The core part of limiting opening writers is it doesn't return chunk only if the - // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each - // region during splitting. It will flush all the logs in the buffer after splitting - // through a threadpool, which means the number of writers it created is under control. - if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) { - return null; - } - long biggestSize = 0; - byte[] biggestBufferKey = null; - - for (Map.Entry entry : buffers.entrySet()) { - long size = entry.getValue().heapSize(); - if (size > biggestSize && (!currentlyWriting.contains(entry.getKey()))) { - biggestSize = size; - biggestBufferKey = entry.getKey(); - } - } - if (biggestBufferKey == null) { - return null; - } - - RegionEntryBuffer buffer = buffers.remove(biggestBufferKey); - currentlyWriting.add(biggestBufferKey); - return buffer; - } - - void doneWriting(RegionEntryBuffer buffer) { - synchronized (this) { - boolean removed = currentlyWriting.remove(buffer.encodedRegionName); - assert removed; - } - long size = buffer.heapSize(); - - synchronized (controller.dataAvailable) { - totalBuffered -= size; - // We may unblock writers - controller.dataAvailable.notifyAll(); - } - } - - synchronized boolean isRegionCurrentlyWriting(byte[] region) { - return currentlyWriting.contains(region); - } - - public void waitUntilDrained() { - synchronized (controller.dataAvailable) { - while (totalBuffered > 0) { - try { - controller.dataAvailable.wait(2000); - } catch (InterruptedException e) { - LOG.warn("Got interrupted while waiting for EntryBuffers is drained"); - Thread.interrupted(); - break; - } - } - } - } - } - /** * A buffer of some number of edits for a given region. * This accumulates edits and also provides a memory optimization in order to @@ -1026,723 +523,6 @@ public class WALSplitter { } } - public static class WriterThread extends Thread { - private volatile boolean shouldStop = false; - private PipelineController controller; - private EntryBuffers entryBuffers; - private OutputSink outputSink = null; - - WriterThread(PipelineController controller, EntryBuffers entryBuffers, OutputSink sink, int i){ - super(Thread.currentThread().getName() + "-Writer-" + i); - this.controller = controller; - this.entryBuffers = entryBuffers; - outputSink = sink; - } - - @Override - public void run() { - try { - doRun(); - } catch (Throwable t) { - LOG.error("Exiting thread", t); - controller.writerThreadError(t); - } - } - - private void doRun() throws IOException { - LOG.trace("Writer thread starting"); - while (true) { - RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); - if (buffer == null) { - // No data currently available, wait on some more to show up - synchronized (controller.dataAvailable) { - if (shouldStop && !this.outputSink.flush()) { - return; - } - try { - controller.dataAvailable.wait(500); - } catch (InterruptedException ie) { - if (!shouldStop) { - throw new RuntimeException(ie); - } - } - } - continue; - } - - assert buffer != null; - try { - writeBuffer(buffer); - } finally { - entryBuffers.doneWriting(buffer); - } - } - } - - private void writeBuffer(RegionEntryBuffer buffer) throws IOException { - outputSink.append(buffer); - } - - void finish() { - synchronized (controller.dataAvailable) { - shouldStop = true; - controller.dataAvailable.notifyAll(); - } - } - } - - /** - * The following class is an abstraction class to provide a common interface to support - * different ways of consuming recovered edits. - */ - public static abstract class OutputSink { - - protected PipelineController controller; - protected EntryBuffers entryBuffers; - - protected ConcurrentHashMap writers = new ConcurrentHashMap<>(); - protected final ConcurrentHashMap regionMaximumEditLogSeqNum = - new ConcurrentHashMap<>(); - - - protected final List writerThreads = Lists.newArrayList(); - - /* Set of regions which we've decided should not output edits */ - protected final Set blacklistedRegions = Collections - .synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR)); - - protected boolean closeAndCleanCompleted = false; - - protected boolean writersClosed = false; - - protected final int numThreads; - - protected CancelableProgressable reporter = null; - - protected AtomicLong skippedEdits = new AtomicLong(); - - protected List splits = null; - - public OutputSink(PipelineController controller, EntryBuffers entryBuffers, int numWriters) { - numThreads = numWriters; - this.controller = controller; - this.entryBuffers = entryBuffers; - } - - void setReporter(CancelableProgressable reporter) { - this.reporter = reporter; - } - - /** - * Start the threads that will pump data from the entryBuffers to the output files. - */ - public synchronized void startWriterThreads() { - for (int i = 0; i < numThreads; i++) { - WriterThread t = new WriterThread(controller, entryBuffers, this, i); - t.start(); - writerThreads.add(t); - } - } - - /** - * - * Update region's maximum edit log SeqNum. - */ - void updateRegionMaximumEditLogSeqNum(Entry entry) { - synchronized (regionMaximumEditLogSeqNum) { - String regionName = Bytes.toString(entry.getKey().getEncodedRegionName()); - Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName); - if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) { - regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId()); - } - } - } - - /** - * @return the number of currently opened writers - */ - int getNumOpenWriters() { - return this.writers.size(); - } - - long getSkippedEdits() { - return this.skippedEdits.get(); - } - - /** - * Wait for writer threads to dump all info to the sink - * @return true when there is no error - * @throws IOException - */ - protected boolean finishWriting(boolean interrupt) throws IOException { - LOG.debug("Waiting for split writer threads to finish"); - boolean progress_failed = false; - for (WriterThread t : writerThreads) { - t.finish(); - } - if (interrupt) { - for (WriterThread t : writerThreads) { - t.interrupt(); // interrupt the writer threads. We are stopping now. - } - } - - for (WriterThread t : writerThreads) { - if (!progress_failed && reporter != null && !reporter.progress()) { - progress_failed = true; - } - try { - t.join(); - } catch (InterruptedException ie) { - IOException iie = new InterruptedIOException(); - iie.initCause(ie); - throw iie; - } - } - controller.checkForErrors(); - LOG.info("{} split writers finished; closing.", this.writerThreads.size()); - return (!progress_failed); - } - - public abstract List finishWritingAndClose() throws IOException; - - /** - * @return a map from encoded region ID to the number of edits written out for that region. - */ - public abstract Map getOutputCounts(); - - /** - * @return number of regions we've recovered - */ - public abstract int getNumberOfRecoveredRegions(); - - /** - * @param buffer A WAL Edit Entry - * @throws IOException - */ - public abstract void append(RegionEntryBuffer buffer) throws IOException; - - /** - * WriterThread call this function to help flush internal remaining edits in buffer before close - * @return true when underlying sink has something to flush - */ - public boolean flush() throws IOException { - return false; - } - - /** - * Some WALEdit's contain only KV's for account on what happened to a region. - * Not all sinks will want to get all of those edits. - * - * @return Return true if this sink wants to accept this region-level WALEdit. - */ - public abstract boolean keepRegionEvent(Entry entry); - } - - /** - * Class that manages the output streams from the log splitting process. - */ - class LogRecoveredEditsOutputSink extends OutputSink { - - public LogRecoveredEditsOutputSink(PipelineController controller, EntryBuffers entryBuffers, - int numWriters) { - // More threads could potentially write faster at the expense - // of causing more disk seeks as the logs are split. - // 3. After a certain setting (probably around 3) the - // process will be bound on the reader in the current - // implementation anyway. - super(controller, entryBuffers, numWriters); - } - - /** - * @return null if failed to report progress - * @throws IOException - */ - @Override - public List finishWritingAndClose() throws IOException { - boolean isSuccessful = false; - List result = null; - try { - isSuccessful = finishWriting(false); - } finally { - result = close(); - List thrown = closeLogWriters(null); - if (CollectionUtils.isNotEmpty(thrown)) { - throw MultipleIOException.createIOException(thrown); - } - } - if (isSuccessful) { - splits = result; - } - return splits; - } - - // delete the one with fewer wal entries - private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) - throws IOException { - long dstMinLogSeqNum = -1L; - try (WAL.Reader reader = walFactory.createReader(walFS, dst)) { - WAL.Entry entry = reader.next(); - if (entry != null) { - dstMinLogSeqNum = entry.getKey().getSequenceId(); - } - } catch (EOFException e) { - LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", - dst, e); - } - if (wap.minLogSeqNum < dstMinLogSeqNum) { - LOG.warn("Found existing old edits file. It could be the result of a previous failed" - + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" - + walFS.getFileStatus(dst).getLen()); - if (!walFS.delete(dst, false)) { - LOG.warn("Failed deleting of old {}", dst); - throw new IOException("Failed deleting of old " + dst); - } - } else { - LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p - + ", length=" + walFS.getFileStatus(wap.p).getLen()); - if (!walFS.delete(wap.p, false)) { - LOG.warn("Failed deleting of {}", wap.p); - throw new IOException("Failed deleting of " + wap.p); - } - } - } - - /** - * Close all of the output streams. - * @return the list of paths written. - */ - List close() throws IOException { - Preconditions.checkState(!closeAndCleanCompleted); - - final List paths = new ArrayList<>(); - final List thrown = Lists.newArrayList(); - ThreadPoolExecutor closeThreadPool = Threads - .getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() { - private int count = 1; - - @Override public Thread newThread(Runnable r) { - Thread t = new Thread(r, "split-log-closeStream-" + count++); - return t; - } - }); - CompletionService completionService = new ExecutorCompletionService<>(closeThreadPool); - boolean progress_failed; - try { - progress_failed = executeCloseTask(completionService, thrown, paths); - } catch (InterruptedException e) { - IOException iie = new InterruptedIOException(); - iie.initCause(e); - throw iie; - } catch (ExecutionException e) { - throw new IOException(e.getCause()); - } finally { - closeThreadPool.shutdownNow(); - } - if (!thrown.isEmpty()) { - throw MultipleIOException.createIOException(thrown); - } - writersClosed = true; - closeAndCleanCompleted = true; - if (progress_failed) { - return null; - } - return paths; - } - - /** - * @param completionService threadPool to execute the closing tasks - * @param thrown store the exceptions - * @param paths arrayList to store the paths written - * @return if close tasks executed successful - */ - boolean executeCloseTask(CompletionService completionService, - List thrown, List paths) - throws InterruptedException, ExecutionException { - for (final Map.Entry writersEntry : writers.entrySet()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p); - } - completionService.submit(new Callable() { - @Override public Void call() throws Exception { - WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); - Path dst = closeWriter(writersEntry.getKey(), wap, thrown); - paths.add(dst); - return null; - } - }); - } - boolean progress_failed = false; - for (int i = 0, n = this.writers.size(); i < n; i++) { - Future future = completionService.take(); - future.get(); - if (!progress_failed && reporter != null && !reporter.progress()) { - progress_failed = true; - } - } - return progress_failed; - } - - Path closeWriter(String encodedRegionName, WriterAndPath wap, - List thrown) throws IOException{ - LOG.trace("Closing " + wap.p); - try { - wap.w.close(); - } catch (IOException ioe) { - LOG.error("Couldn't close log at " + wap.p, ioe); - thrown.add(ioe); - return null; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten - + " edits, skipped " + wap.editsSkipped + " edits in " - + (wap.nanosSpent / 1000 / 1000) + "ms"); - } - if (wap.editsWritten == 0) { - // just remove the empty recovered.edits file - if (walFS.exists(wap.p) && !walFS.delete(wap.p, false)) { - LOG.warn("Failed deleting empty " + wap.p); - throw new IOException("Failed deleting empty " + wap.p); - } - return null; - } - - Path dst = getCompletedRecoveredEditsFilePath(wap.p, - regionMaximumEditLogSeqNum.get(encodedRegionName)); - try { - if (!dst.equals(wap.p) && walFS.exists(dst)) { - deleteOneWithFewerEntries(wap, dst); - } - // Skip the unit tests which create a splitter that reads and - // writes the data without touching disk. - // TestHLogSplit#testThreading is an example. - if (walFS.exists(wap.p)) { - if (!walFS.rename(wap.p, dst)) { - throw new IOException("Failed renaming " + wap.p + " to " + dst); - } - LOG.info("Rename " + wap.p + " to " + dst); - } - } catch (IOException ioe) { - LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); - thrown.add(ioe); - return null; - } - return dst; - } - - private List closeLogWriters(List thrown) throws IOException { - if (writersClosed) { - return thrown; - } - if (thrown == null) { - thrown = Lists.newArrayList(); - } - try { - for (WriterThread t : writerThreads) { - while (t.isAlive()) { - t.shouldStop = true; - t.interrupt(); - try { - t.join(10); - } catch (InterruptedException e) { - IOException iie = new InterruptedIOException(); - iie.initCause(e); - throw iie; - } - } - } - } finally { - WriterAndPath wap = null; - for (SinkWriter tmpWAP : writers.values()) { - try { - wap = (WriterAndPath) tmpWAP; - wap.w.close(); - } catch (IOException ioe) { - LOG.error("Couldn't close log at " + wap.p, ioe); - thrown.add(ioe); - continue; - } - LOG.info( - "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent - / 1000 / 1000) + "ms)"); - } - writersClosed = true; - } - - return thrown; - } - - /** - * Get a writer and path for a log starting at the given entry. This function is threadsafe so - * long as multiple threads are always acting on different regions. - * @return null if this region shouldn't output any logs - */ - WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException { - byte region[] = entry.getKey().getEncodedRegionName(); - String regionName = Bytes.toString(region); - WriterAndPath ret = (WriterAndPath) writers.get(regionName); - if (ret != null) { - return ret; - } - // If we already decided that this region doesn't get any output - // we don't need to check again. - if (blacklistedRegions.contains(region)) { - return null; - } - ret = createWAP(region, entry); - if (ret == null) { - blacklistedRegions.add(region); - return null; - } - if(reusable) { - writers.put(regionName, ret); - } - return ret; - } - - /** - * @return a path with a write for that path. caller should close. - */ - WriterAndPath createWAP(byte[] region, Entry entry) throws IOException { - String tmpDirName = conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, - HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); - Path regionedits = getRegionSplitEditsPath(entry, - fileBeingSplit.getPath().getName(), tmpDirName, conf); - if (regionedits == null) { - return null; - } - FileSystem walFs = FSUtils.getWALFileSystem(conf); - if (walFs.exists(regionedits)) { - LOG.warn("Found old edits file. It could be the " - + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" - + walFs.getFileStatus(regionedits).getLen()); - if (!walFs.delete(regionedits, false)) { - LOG.warn("Failed delete of old {}", regionedits); - } - } - Writer w = createWriter(regionedits); - LOG.debug("Creating writer path={}", regionedits); - return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId()); - } - - void filterCellByStore(Entry logEntry) { - Map maxSeqIdInStores = - regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); - if (MapUtils.isEmpty(maxSeqIdInStores)) { - return; - } - // Create the array list for the cells that aren't filtered. - // We make the assumption that most cells will be kept. - ArrayList keptCells = new ArrayList<>(logEntry.getEdit().getCells().size()); - for (Cell cell : logEntry.getEdit().getCells()) { - if (CellUtil.matchingFamily(cell, WALEdit.METAFAMILY)) { - keptCells.add(cell); - } else { - byte[] family = CellUtil.cloneFamily(cell); - Long maxSeqId = maxSeqIdInStores.get(family); - // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, - // or the master was crashed before and we can not get the information. - if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) { - keptCells.add(cell); - } - } - } - - // Anything in the keptCells array list is still live. - // So rather than removing the cells from the array list - // which would be an O(n^2) operation, we just replace the list - logEntry.getEdit().setCells(keptCells); - } - - @Override - public void append(RegionEntryBuffer buffer) throws IOException { - appendBuffer(buffer, true); - } - - WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{ - List entries = buffer.entryBuffer; - if (entries.isEmpty()) { - LOG.warn("got an empty buffer, skipping"); - return null; - } - - WriterAndPath wap = null; - - long startTime = System.nanoTime(); - try { - int editsCount = 0; - - for (Entry logEntry : entries) { - if (wap == null) { - wap = getWriterAndPath(logEntry, reusable); - if (wap == null) { - if (LOG.isTraceEnabled()) { - // This log spews the full edit. Can be massive in the log. Enable only debugging - // WAL lost edit issues. - LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry); - } - return null; - } - } - filterCellByStore(logEntry); - if (!logEntry.getEdit().isEmpty()) { - wap.w.append(logEntry); - this.updateRegionMaximumEditLogSeqNum(logEntry); - editsCount++; - } else { - wap.incrementSkippedEdits(1); - } - } - // Pass along summary statistics - wap.incrementEdits(editsCount); - wap.incrementNanoTime(System.nanoTime() - startTime); - } catch (IOException e) { - e = e instanceof RemoteException ? - ((RemoteException)e).unwrapRemoteException() : e; - LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e); - throw e; - } - return wap; - } - - @Override - public boolean keepRegionEvent(Entry entry) { - ArrayList cells = entry.getEdit().getCells(); - for (Cell cell : cells) { - if (WALEdit.isCompactionMarker(cell)) { - return true; - } - } - return false; - } - - /** - * @return a map from encoded region ID to the number of edits written out for that region. - */ - @Override - public Map getOutputCounts() { - TreeMap ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (Map.Entry entry : writers.entrySet()) { - ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten); - } - return ret; - } - - @Override - public int getNumberOfRecoveredRegions() { - return writers.size(); - } - } - - /** - * - */ - class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink { - - private ConcurrentHashMap regionRecoverStatMap = new ConcurrentHashMap<>(); - - public BoundedLogWriterCreationOutputSink(PipelineController controller, - EntryBuffers entryBuffers, int numWriters) { - super(controller, entryBuffers, numWriters); - } - - @Override - public List finishWritingAndClose() throws IOException { - boolean isSuccessful; - List result; - try { - isSuccessful = finishWriting(false); - } finally { - result = close(); - } - if (isSuccessful) { - splits = result; - } - return splits; - } - - @Override - boolean executeCloseTask(CompletionService completionService, - List thrown, List paths) - throws InterruptedException, ExecutionException { - for (final Map.Entry buffer : entryBuffers.buffers.entrySet()) { - LOG.info("Submitting writeThenClose of {}", - Arrays.toString(buffer.getValue().encodedRegionName)); - completionService.submit(new Callable() { - @Override - public Void call() throws Exception { - Path dst = writeThenClose(buffer.getValue()); - paths.add(dst); - return null; - } - }); - } - boolean progress_failed = false; - for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { - Future future = completionService.take(); - future.get(); - if (!progress_failed && reporter != null && !reporter.progress()) { - progress_failed = true; - } - } - - return progress_failed; - } - - /** - * since the splitting process may create multiple output files, we need a map - * regionRecoverStatMap to track the output count of each region. - * @return a map from encoded region ID to the number of edits written out for that region. - */ - @Override - public Map getOutputCounts() { - Map regionRecoverStatMapResult = new HashMap<>(); - for(Map.Entry entry: regionRecoverStatMap.entrySet()){ - regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue()); - } - return regionRecoverStatMapResult; - } - - /** - * @return the number of recovered regions - */ - @Override - public int getNumberOfRecoveredRegions() { - return regionRecoverStatMap.size(); - } - - /** - * Append the buffer to a new recovered edits file, then close it after all done - * @param buffer contain all entries of a certain region - * @throws IOException when closeWriter failed - */ - @Override - public void append(RegionEntryBuffer buffer) throws IOException { - writeThenClose(buffer); - } - - private Path writeThenClose(RegionEntryBuffer buffer) throws IOException { - WriterAndPath wap = appendBuffer(buffer, false); - if(wap != null) { - String encodedRegionName = Bytes.toString(buffer.encodedRegionName); - Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten); - if (value != null) { - Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten; - regionRecoverStatMap.put(encodedRegionName, newValue); - } - } - - Path dst = null; - List thrown = new ArrayList<>(); - if(wap != null){ - dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown); - } - if (!thrown.isEmpty()) { - throw MultipleIOException.createIOException(thrown); - } - return dst; - } - } - /** * Class wraps the actual writer which writes data out and related statistics */ @@ -1771,14 +551,14 @@ public class WALSplitter { * Private data structure that wraps a Writer and its Path, also collecting statistics about the * data written to this output. */ - private final static class WriterAndPath extends SinkWriter { - final Path p; - final Writer w; + final static class WriterAndPath extends SinkWriter { + final Path path; + final Writer writer; final long minLogSeqNum; - WriterAndPath(final Path p, final Writer w, final long minLogSeqNum) { - this.p = p; - this.w = w; + WriterAndPath(final Path path, final Writer writer, final long minLogSeqNum) { + this.path = path; + this.writer = writer; this.minLogSeqNum = minLogSeqNum; } } @@ -1790,125 +570,4 @@ public class WALSplitter { super(s); } } - - /** A struct used by getMutationsFromWALEntry */ - public static class MutationReplay implements Comparable { - public MutationReplay(MutationType type, Mutation mutation, long nonceGroup, long nonce) { - this.type = type; - this.mutation = mutation; - if(this.mutation.getDurability() != Durability.SKIP_WAL) { - // using ASYNC_WAL for relay - this.mutation.setDurability(Durability.ASYNC_WAL); - } - this.nonceGroup = nonceGroup; - this.nonce = nonce; - } - - public final MutationType type; - public final Mutation mutation; - public final long nonceGroup; - public final long nonce; - - @Override - public int compareTo(final MutationReplay d) { - return this.mutation.compareTo(d.mutation); - } - - @Override - public boolean equals(Object obj) { - if(!(obj instanceof MutationReplay)) { - return false; - } else { - return this.compareTo((MutationReplay)obj) == 0; - } - } - - @Override - public int hashCode() { - return this.mutation.hashCode(); - } - } - - /** - * This function is used to construct mutations from a WALEntry. It also - * reconstructs WALKey & WALEdit from the passed in WALEntry - * @param entry - * @param cells - * @param logEntry pair of WALKey and WALEdit instance stores WALKey and WALEdit instances - * extracted from the passed in WALEntry. - * @return list of Pair<MutationType, Mutation> to be replayed - * @throws IOException - */ - public static List getMutationsFromWALEntry(WALEntry entry, CellScanner cells, - Pair logEntry, Durability durability) throws IOException { - if (entry == null) { - // return an empty array - return Collections.emptyList(); - } - - long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? - entry.getKey().getOrigSequenceNumber() : entry.getKey().getLogSequenceNumber(); - int count = entry.getAssociatedCellCount(); - List mutations = new ArrayList<>(); - Cell previousCell = null; - Mutation m = null; - WALKeyImpl key = null; - WALEdit val = null; - if (logEntry != null) { - val = new WALEdit(); - } - - for (int i = 0; i < count; i++) { - // Throw index out of bounds if our cell count is off - if (!cells.advance()) { - throw new ArrayIndexOutOfBoundsException("Expected=" + count + ", index=" + i); - } - Cell cell = cells.current(); - if (val != null) val.add(cell); - - boolean isNewRowOrType = - previousCell == null || previousCell.getTypeByte() != cell.getTypeByte() - || !CellUtil.matchingRows(previousCell, cell); - if (isNewRowOrType) { - // Create new mutation - if (CellUtil.isDelete(cell)) { - m = new Delete(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - // Deletes don't have nonces. - mutations.add(new MutationReplay( - MutationType.DELETE, m, HConstants.NO_NONCE, HConstants.NO_NONCE)); - } else { - m = new Put(cell.getRowArray(), cell.getRowOffset(), cell.getRowLength()); - // Puts might come from increment or append, thus we need nonces. - long nonceGroup = entry.getKey().hasNonceGroup() - ? entry.getKey().getNonceGroup() : HConstants.NO_NONCE; - long nonce = entry.getKey().hasNonce() ? entry.getKey().getNonce() : HConstants.NO_NONCE; - mutations.add(new MutationReplay(MutationType.PUT, m, nonceGroup, nonce)); - } - } - if (CellUtil.isDelete(cell)) { - ((Delete) m).add(cell); - } else { - ((Put) m).add(cell); - } - m.setDurability(durability); - previousCell = cell; - } - - // reconstruct WALKey - if (logEntry != null) { - org.apache.hadoop.hbase.shaded.protobuf.generated.WALProtos.WALKey walKeyProto = - entry.getKey(); - List clusterIds = new ArrayList<>(walKeyProto.getClusterIdsCount()); - for (HBaseProtos.UUID uuid : entry.getKey().getClusterIdsList()) { - clusterIds.add(new UUID(uuid.getMostSigBits(), uuid.getLeastSigBits())); - } - key = new WALKeyImpl(walKeyProto.getEncodedRegionName().toByteArray(), TableName.valueOf( - walKeyProto.getTableName().toByteArray()), replaySeqId, walKeyProto.getWriteTime(), - clusterIds, walKeyProto.getNonceGroup(), walKeyProto.getNonce(), null); - logEntry.setFirst(key); - logEntry.setSecond(val); - } - - return mutations; - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java index 084dd893be6..8087ae8776e 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/AbstractTestDLS.java @@ -81,7 +81,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.hadoop.hbase.zookeeper.ZKUtil; import org.apache.hadoop.hbase.zookeeper.ZKWatcher; import org.junit.After; @@ -226,14 +226,14 @@ public abstract class AbstractTestDLS { for (RegionInfo hri : regions) { Path tdir = FSUtils.getWALTableDir(conf, table); @SuppressWarnings("deprecation") - Path editsdir = WALSplitter + Path editsdir = WALSplitUtil .getRegionDirRecoveredEditsDir(FSUtils.getWALRegionDir(conf, tableName, hri.getEncodedName())); LOG.debug("checking edits dir " + editsdir); FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { @Override public boolean accept(Path p) { - if (WALSplitter.isSequenceIdFile(p)) { + if (WALSplitUtil.isSequenceIdFile(p)) { return false; } return true; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteColumnFamilyProcedureFromClient.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteColumnFamilyProcedureFromClient.java index d5ec62db376..0e90246eff3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteColumnFamilyProcedureFromClient.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/master/procedure/TestDeleteColumnFamilyProcedureFromClient.java @@ -39,7 +39,7 @@ import org.apache.hadoop.hbase.testclassification.LargeTests; import org.apache.hadoop.hbase.testclassification.MasterTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -163,7 +163,7 @@ public class TestDeleteColumnFamilyProcedureFromClient { FileStatus[] cf = fs.listStatus(fileStatus[i].getPath(), new PathFilter() { @Override public boolean accept(Path p) { - if (WALSplitter.isSequenceIdFile(p)) { + if (WALSplitUtil.isSequenceIdFile(p)) { return false; } return true; @@ -244,7 +244,7 @@ public class TestDeleteColumnFamilyProcedureFromClient { FileStatus[] cf = fs.listStatus(fileStatus[i].getPath(), new PathFilter() { @Override public boolean accept(Path p) { - if (WALSplitter.isSequenceIdFile(p)) { + if (WALSplitUtil.isSequenceIdFile(p)) { return false; } return true; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java index c09f70201bf..514d0d64a7a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegion.java @@ -161,7 +161,7 @@ import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider; import org.apache.hadoop.hbase.wal.WALProvider.Writer; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -687,7 +687,7 @@ public class TestHRegion { FileSystem fs = region.getRegionFileSystem().getFileSystem(); byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); - Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); + Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); long maxSeqId = 1050; long minSeqId = 1000; @@ -738,7 +738,7 @@ public class TestHRegion { FileSystem fs = region.getRegionFileSystem().getFileSystem(); byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); - Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); + Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); long maxSeqId = 1050; long minSeqId = 1000; @@ -791,7 +791,7 @@ public class TestHRegion { Path regiondir = region.getRegionFileSystem().getRegionDir(); FileSystem fs = region.getRegionFileSystem().getFileSystem(); - Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); + Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); for (int i = 1000; i < 1050; i += 10) { Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", i)); FSDataOutputStream dos = fs.create(recoveredEdits); @@ -824,7 +824,7 @@ public class TestHRegion { assertEquals(0, region.getStoreFileList(columns).size()); - Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); + Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); long maxSeqId = 1050; long minSeqId = 1000; @@ -940,7 +940,7 @@ public class TestHRegion { WALUtil.writeCompactionMarker(region.getWAL(), this.region.getReplicationScope(), this.region.getRegionInfo(), compactionDescriptor, region.getMVCC()); - Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); + Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); fs.create(recoveredEdits); @@ -1065,7 +1065,7 @@ public class TestHRegion { // now write those markers to the recovered edits again. - Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); + Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); Path recoveredEdits = new Path(recoveredEditsDir, String.format("%019d", 1000)); fs.create(recoveredEdits); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java index 98119db83fc..ae4154fb7a2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestHRegionReplayEvents.java @@ -77,7 +77,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; -import org.apache.hadoop.hbase.wal.WALSplitter.MutationReplay; +import org.apache.hadoop.hbase.wal.WALSplitUtil.MutationReplay; import org.apache.hadoop.util.StringUtils; import org.junit.After; import org.junit.AfterClass; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java index 4a328209c61..c3f6d045438 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEdits.java @@ -52,7 +52,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKey; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.junit.BeforeClass; import org.junit.ClassRule; import org.junit.Rule; @@ -149,7 +149,7 @@ public class TestRecoveredEdits { assertTrue(storeFiles.isEmpty()); region.close(); Path regionDir = region.getRegionDir(hbaseRootDir, hri); - Path recoveredEditsDir = WALSplitter.getRegionDirRecoveredEditsDir(regionDir); + Path recoveredEditsDir = WALSplitUtil.getRegionDirRecoveredEditsDir(regionDir); // This is a little fragile getting this path to a file of 10M of edits. Path recoveredEditsFile = new Path( System.getProperty("test.build.classes", "target/test-classes"), diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java index e55f3eea172..d9f77660f26 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRecoveredEditsReplayAndAbort.java @@ -46,7 +46,7 @@ import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; import org.apache.hadoop.hbase.wal.WALProvider; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -146,7 +146,7 @@ public class TestRecoveredEditsReplayAndAbort { FileSystem fs = region.getRegionFileSystem().getFileSystem(); byte[] regionName = region.getRegionInfo().getEncodedNameAsBytes(); - Path recoveredEditsDir = WALSplitter + Path recoveredEditsDir = WALSplitUtil .getRegionDirRecoveredEditsDir(regiondir); long maxSeqId = 1200; long minSeqId = 1000; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java index 76633132f5c..16c583734f8 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/AbstractTestWALReplay.java @@ -96,6 +96,7 @@ import org.apache.hadoop.hbase.wal.WAL; import org.apache.hadoop.hbase.wal.WALEdit; import org.apache.hadoop.hbase.wal.WALFactory; import org.apache.hadoop.hbase.wal.WALKeyImpl; +import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.apache.hadoop.hbase.wal.WALSplitter; import org.apache.hadoop.hdfs.DFSInputStream; import org.junit.After; @@ -902,15 +903,12 @@ public abstract class AbstractTestWALReplay { assertTrue(listStatus.length > 0); WALSplitter.splitLogFile(hbaseRootDir, listStatus[0], this.fs, this.conf, null, null, null, wals); - FileStatus[] listStatus1 = this.fs.listStatus( - new Path(FSUtils.getWALTableDir(conf, tableName), new Path(hri.getEncodedName(), - "recovered.edits")), new PathFilter() { + FileStatus[] listStatus1 = this.fs.listStatus(new Path(FSUtils.getWALTableDir(conf, tableName), + new Path(hri.getEncodedName(), "recovered.edits")), + new PathFilter() { @Override public boolean accept(Path p) { - if (WALSplitter.isSequenceIdFile(p)) { - return false; - } - return true; + return !WALSplitUtil.isSequenceIdFile(p); } }); int editCount = 0; @@ -956,7 +954,7 @@ public abstract class AbstractTestWALReplay { runWALSplit(this.conf); // here we let the DFSInputStream throw an IOException just after the WALHeader. - Path editFile = WALSplitter.getSplitEditFilesSorted(this.fs, regionDir).first(); + Path editFile = WALSplitUtil.getSplitEditFilesSorted(this.fs, regionDir).first(); FSDataInputStream stream = fs.open(editFile); stream.seek(ProtobufLogReader.PB_WAL_MAGIC.length); Class logReaderClass = diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreSnapshotHelper.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreSnapshotHelper.java index acc1f55003b..3a21405f117 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreSnapshotHelper.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/snapshot/TestRestoreSnapshotHelper.java @@ -47,7 +47,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CommonFSUtils; import org.apache.hadoop.hbase.util.FSTableDescriptors; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.wal.WALSplitter; +import org.apache.hadoop.hbase.wal.WALSplitUtil; import org.junit.After; import org.junit.AfterClass; import org.junit.Assert; @@ -167,13 +167,13 @@ public class TestRestoreSnapshotHelper { region.initialize(); Path recoveredEdit = FSUtils.getWALRegionDir(conf, tableName, region.getRegionInfo().getEncodedName()); - long maxSeqId = WALSplitter.getMaxRegionSequenceId(fs, recoveredEdit); + long maxSeqId = WALSplitUtil.getMaxRegionSequenceId(fs, recoveredEdit); // open restored region without set restored flag HRegion region2 = HRegion.newHRegion(FSUtils.getTableDir(restoreDir, tableName), null, fs, conf, restoredRegion, htd, null); region2.initialize(); - long maxSeqId2 = WALSplitter.getMaxRegionSequenceId(fs, recoveredEdit); + long maxSeqId2 = WALSplitUtil.getMaxRegionSequenceId(fs, recoveredEdit); Assert.assertTrue(maxSeqId2 > maxSeqId); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java index 8ae638ce9a4..6386112835b 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestReadWriteSeqIdFiles.java @@ -66,30 +66,30 @@ public class TestReadWriteSeqIdFiles { @Test public void test() throws IOException { - WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1000L); - assertEquals(1000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR)); - WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 2000L); - assertEquals(2000L, WALSplitter.getMaxRegionSequenceId(walFS, REGION_DIR)); + WALSplitUtil.writeRegionSequenceIdFile(walFS, REGION_DIR, 1000L); + assertEquals(1000L, WALSplitUtil.getMaxRegionSequenceId(walFS, REGION_DIR)); + WALSplitUtil.writeRegionSequenceIdFile(walFS, REGION_DIR, 2000L); + assertEquals(2000L, WALSplitUtil.getMaxRegionSequenceId(walFS, REGION_DIR)); // can not write a sequence id which is smaller try { - WALSplitter.writeRegionSequenceIdFile(walFS, REGION_DIR, 1500L); + WALSplitUtil.writeRegionSequenceIdFile(walFS, REGION_DIR, 1500L); } catch (IOException e) { // expected LOG.info("Expected error", e); } - Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(REGION_DIR); + Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(REGION_DIR); FileStatus[] files = FSUtils.listStatus(walFS, editsdir, new PathFilter() { @Override public boolean accept(Path p) { - return WALSplitter.isSequenceIdFile(p); + return WALSplitUtil.isSequenceIdFile(p); } }); // only one seqid file should exist assertEquals(1, files.length); // verify all seqId files aren't treated as recovered.edits files - NavigableSet recoveredEdits = WALSplitter.getSplitEditFilesSorted(walFS, REGION_DIR); + NavigableSet recoveredEdits = WALSplitUtil.getSplitEditFilesSorted(walFS, REGION_DIR); assertEquals(0, recoveredEdits.size()); } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java index b20b3a59024..741d4492198 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java @@ -39,7 +39,6 @@ import org.apache.hadoop.hbase.testclassification.RegionServerTests; import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.wal.WALSplitter.EntryBuffers; import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; import org.junit.ClassRule; @@ -89,17 +88,17 @@ public class TestWALMethods { Path regiondir = util.getDataTestDir("regiondir"); fs.delete(regiondir, true); fs.mkdirs(regiondir); - Path recoverededits = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); - String first = WALSplitter.formatRecoveredEditsFileName(-1); + Path recoverededits = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); + String first = WALSplitUtil.formatRecoveredEditsFileName(-1); createFile(fs, recoverededits, first); - createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(0)); - createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(1)); - createFile(fs, recoverededits, WALSplitter + createFile(fs, recoverededits, WALSplitUtil.formatRecoveredEditsFileName(0)); + createFile(fs, recoverededits, WALSplitUtil.formatRecoveredEditsFileName(1)); + createFile(fs, recoverededits, WALSplitUtil .formatRecoveredEditsFileName(11)); - createFile(fs, recoverededits, WALSplitter.formatRecoveredEditsFileName(2)); - createFile(fs, recoverededits, WALSplitter + createFile(fs, recoverededits, WALSplitUtil.formatRecoveredEditsFileName(2)); + createFile(fs, recoverededits, WALSplitUtil .formatRecoveredEditsFileName(50)); - String last = WALSplitter.formatRecoveredEditsFileName(Long.MAX_VALUE); + String last = WALSplitUtil.formatRecoveredEditsFileName(Long.MAX_VALUE); createFile(fs, recoverededits, last); createFile(fs, recoverededits, Long.toString(Long.MAX_VALUE) + "." + System.currentTimeMillis()); @@ -108,21 +107,21 @@ public class TestWALMethods { FSUtils.setRootDir(walConf, regiondir); (new WALFactory(walConf, "dummyLogName")).getWAL(null); - NavigableSet files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); + NavigableSet files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir); assertEquals(7, files.size()); assertEquals(files.pollFirst().getName(), first); assertEquals(files.pollLast().getName(), last); assertEquals(files.pollFirst().getName(), - WALSplitter + WALSplitUtil .formatRecoveredEditsFileName(0)); assertEquals(files.pollFirst().getName(), - WALSplitter + WALSplitUtil .formatRecoveredEditsFileName(1)); assertEquals(files.pollFirst().getName(), - WALSplitter + WALSplitUtil .formatRecoveredEditsFileName(2)); assertEquals(files.pollFirst().getName(), - WALSplitter + WALSplitUtil .formatRecoveredEditsFileName(11)); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index b4e25336a89..696ddea1da2 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -399,7 +399,7 @@ public class TestWALSplit { Path regiondir = new Path(tdir, RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedName()); fs.mkdirs(regiondir); - Path parent = WALSplitter.getRegionDirRecoveredEditsDir(regiondir); + Path parent = WALSplitUtil.getRegionDirRecoveredEditsDir(regiondir); assertEquals(HConstants.RECOVERED_EDITS_DIR, parent.getName()); fs.createNewFile(parent); // create a recovered.edits file String parentOfParent = p.getParent().getParent().getName(); @@ -414,7 +414,7 @@ public class TestWALSplit { new Entry(new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), new WALEdit()); - Path p = WALSplitter.getRegionSplitEditsPath(entry, + Path p = WALSplitUtil.getRegionSplitEditsPath(entry, FILENAME_BEING_SPLIT, TMPDIRNAME, conf); return p; } @@ -422,10 +422,10 @@ public class TestWALSplit { @Test public void testHasRecoveredEdits() throws IOException { Path p = createRecoveredEditsPathForRegion(); - assertFalse(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); + assertFalse(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); String renamedEdit = p.getName().split("-")[0]; fs.createNewFile(new Path(p.getParent(), renamedEdit)); - assertTrue(WALSplitter.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); + assertTrue(WALSplitUtil.hasRecoveredEdits(conf, RegionInfoBuilder.FIRST_META_REGIONINFO)); } private void useDifferentDFSClient() throws IOException { @@ -1157,7 +1157,7 @@ public class TestWALSplit { // After creating writer, simulate region's // replayRecoveredEditsIfAny() which gets SplitEditFiles of this // region and delete them, excluding files with '.temp' suffix. - NavigableSet files = WALSplitter.getSplitEditFilesSorted(fs, regiondir); + NavigableSet files = WALSplitUtil.getSplitEditFilesSorted(fs, regiondir); if (files != null && !files.isEmpty()) { for (Path file : files) { if (!this.walFS.delete(file, false)) { @@ -1243,12 +1243,12 @@ public class TestWALSplit { throws IOException { Path tdir = FSUtils.getWALTableDir(conf, table); @SuppressWarnings("deprecation") - Path editsdir = WALSplitter.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, + Path editsdir = WALSplitUtil.getRegionDirRecoveredEditsDir(HRegion.getRegionDir(tdir, Bytes.toString(Bytes.toBytes(region)))); FileStatus[] files = fs.listStatus(editsdir, new PathFilter() { @Override public boolean accept(Path p) { - if (WALSplitter.isSequenceIdFile(p)) { + if (WALSplitUtil.isSequenceIdFile(p)) { return false; } return true;