HBASE-24577 Doc WALSplitter classes (#1913)

Signed-off-by: Anoop Sam John <anoopsamjohn@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
Michael Stack 2020-06-17 14:59:30 -07:00 committed by stack
parent 192daded61
commit 6a576696dc
5 changed files with 60 additions and 34 deletions

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -18,7 +18,6 @@
package org.apache.hadoop.hbase.wal; package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME; import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
import java.io.IOException; import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.util.HashMap; import java.util.HashMap;
@ -29,7 +28,6 @@ import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellComparatorImpl; import org.apache.hadoop.hbase.CellComparatorImpl;
@ -51,13 +49,16 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/**
* A WALSplitter sink that outputs {@link org.apache.hadoop.hbase.io.hfile.HFile}s.
* Runs with a bounded number of HFile writers at any one time rather than let the count run up.
* @see BoundedRecoveredEditsOutputSink for a sink implementation that writes intermediate
* recovered.edits files.
*/
@InterfaceAudience.Private @InterfaceAudience.Private
public class BoundedRecoveredHFilesOutputSink extends OutputSink { public class BoundedRecoveredHFilesOutputSink extends OutputSink {
private static final Logger LOG = LoggerFactory.getLogger(BoundedRecoveredHFilesOutputSink.class); private static final Logger LOG = LoggerFactory.getLogger(BoundedRecoveredHFilesOutputSink.class);
public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile";
public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
private final WALSplitter walSplitter; private final WALSplitter walSplitter;
// Since the splitting process may create multiple output files, we need a map // Since the splitting process may create multiple output files, we need a map
@ -80,6 +81,8 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
Map<String, CellSet> familyCells = new HashMap<>(); Map<String, CellSet> familyCells = new HashMap<>();
Map<String, Long> familySeqIds = new HashMap<>(); Map<String, Long> familySeqIds = new HashMap<>();
boolean isMetaTable = buffer.tableName.equals(META_TABLE_NAME); boolean isMetaTable = buffer.tableName.equals(META_TABLE_NAME);
// First iterate all Cells to find which column families are present and to stamp Cell with
// sequence id.
for (WAL.Entry entry : buffer.entryBuffer) { for (WAL.Entry entry : buffer.entryBuffer) {
long seqId = entry.getKey().getSequenceId(); long seqId = entry.getKey().getSequenceId();
List<Cell> cells = entry.getEdit().getCells(); List<Cell> cells = entry.getEdit().getCells();
@ -99,12 +102,13 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
} }
} }
// The key point is create a new writer for each column family, write edits then close writer. // Create a new hfile writer for each column family, write edits then close writer.
String regionName = Bytes.toString(buffer.encodedRegionName); String regionName = Bytes.toString(buffer.encodedRegionName);
for (Map.Entry<String, CellSet> cellsEntry : familyCells.entrySet()) { for (Map.Entry<String, CellSet> cellsEntry : familyCells.entrySet()) {
String familyName = cellsEntry.getKey(); String familyName = cellsEntry.getKey();
StoreFileWriter writer = createRecoveredHFileWriter(buffer.tableName, regionName, StoreFileWriter writer = createRecoveredHFileWriter(buffer.tableName, regionName,
familySeqIds.get(familyName), familyName, isMetaTable); familySeqIds.get(familyName), familyName, isMetaTable);
LOG.trace("Created {}", writer.getPath());
openingWritersNum.incrementAndGet(); openingWritersNum.incrementAndGet();
try { try {
for (Cell cell : cellsEntry.getValue()) { for (Cell cell : cellsEntry.getValue()) {
@ -118,6 +122,7 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
openingWritersNum.decrementAndGet(); openingWritersNum.decrementAndGet();
} finally { } finally {
writer.close(); writer.close();
LOG.trace("Closed {}, edits={}", writer.getPath(), familyCells.size());
} }
} }
} }

View File

@ -56,6 +56,9 @@ abstract class OutputSink {
protected final AtomicLong totalSkippedEdits = new AtomicLong(); protected final AtomicLong totalSkippedEdits = new AtomicLong();
/**
* List of all the files produced by this sink
*/
protected final List<Path> splits = new ArrayList<>(); protected final List<Path> splits = new ArrayList<>();
/** /**

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
@ -34,12 +33,15 @@ import org.apache.hadoop.io.MultipleIOException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists; import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
/** /**
* Class that manages the output streams from the log splitting process. * Class that manages the output streams from the log splitting process.
* Every region only has one recovered edits. * Every region only has one recovered edits file PER split WAL (if we split
* multiple WALs during a log-splitting session, on open, a Region may
* have multiple recovered.edits files to replay -- one per split WAL).
* @see BoundedRecoveredEditsOutputSink which is like this class but imposes upper bound on
* the number of writers active at one time (makes for better throughput).
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
@ -81,6 +83,7 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
if (ret == null) { if (ret == null) {
return null; return null;
} }
LOG.trace("Created {}", ret.path);
writers.put(Bytes.toString(region), ret); writers.put(Bytes.toString(region), ret);
return ret; return ret;
} }
@ -106,6 +109,7 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
for (RecoveredEditsWriter writer : writers.values()) { for (RecoveredEditsWriter writer : writers.values()) {
closeCompletionService.submit(() -> { closeCompletionService.submit(() -> {
Path dst = closeRecoveredEditsWriter(writer, thrown); Path dst = closeRecoveredEditsWriter(writer, thrown);
LOG.trace("Closed {}", dst);
splits.add(dst); splits.add(dst);
return null; return null;
}); });

View File

@ -1,4 +1,4 @@
/** /*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -17,10 +17,7 @@
*/ */
package org.apache.hadoop.hbase.wal; package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.DEFAULT_WAL_SPLIT_TO_HFILE;
import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile; import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile;
import java.io.EOFException; import java.io.EOFException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
@ -63,18 +60,19 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
/** /**
* This class is responsible for splitting up a bunch of regionserver commit log * Split RegionServer WAL files. Splits the WAL into new files,
* files that are no longer being written to, into new files, one per region, for * one per region, to be picked up on Region reopen. Deletes the split WAL when finished.
* recovering data on startup. Delete the old log files when finished. * See {@link #split(Path, Path, Path, FileSystem, Configuration, WALFactory)} or
* {@link #splitLogFile(Path, FileStatus, FileSystem, Configuration, CancelableProgressable,
* LastSequenceId, SplitLogWorkerCoordination, WALFactory, RegionServerServices)} for
* entry-point.
*/ */
@InterfaceAudience.Private @InterfaceAudience.Private
public class WALSplitter { public class WALSplitter {
@ -97,7 +95,12 @@ public class WALSplitter {
OutputSink outputSink; OutputSink outputSink;
private EntryBuffers entryBuffers; private EntryBuffers entryBuffers;
/**
* Coordinator for split log. Used by the zk-based log splitter.
* Not used by the procedure v2-based log splitter.
*/
private SplitLogWorkerCoordination splitLogWorkerCoordination; private SplitLogWorkerCoordination splitLogWorkerCoordination;
private final WALFactory walFactory; private final WALFactory walFactory;
private MonitoredTask status; private MonitoredTask status;
@ -116,7 +119,20 @@ public class WALSplitter {
private final String tmpDirName; private final String tmpDirName;
/**
* Split WAL directly to hfiles instead of into intermediary 'recovered.edits' files.
*/
public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile";
public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
/**
* True if we are to run with bounded amount of writers rather than let the count blossom.
* Default is 'false'. Does not apply if you have set 'hbase.wal.split.to.hfile' as that
* is always bounded. Only applies when you are doing recovery to 'recovered.edits'
* files (the old default). Bounded writing tends to have higher throughput.
*/
public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded"; public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize"; public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize";
public final static String SPLIT_WAL_WRITER_THREADS = public final static String SPLIT_WAL_WRITER_THREADS =
"hbase.regionserver.hlog.splitlog.writer.threads"; "hbase.regionserver.hlog.splitlog.writer.threads";
@ -185,11 +201,7 @@ public class WALSplitter {
} }
/** /**
* Splits a WAL file into region's recovered-edits directory. * Splits a WAL file.
* This is the main entry point for distributed log splitting from SplitLogWorker.
* <p>
* If the log file has N regions then N recovered.edits files will be produced.
* <p>
* @return false if it is interrupted by the progress-able. * @return false if it is interrupted by the progress-able.
*/ */
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS, public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
@ -203,10 +215,13 @@ public class WALSplitter {
return s.splitLogFile(logfile, reporter); return s.splitLogFile(logfile, reporter);
} }
// A wrapper to split one log folder using the method used by distributed /**
// log splitting. Used by tools and unit tests. It should be package private. * Split a folder of WAL files. Delete the directory when done.
// It is public only because TestWALObserver is in a different package, * Used by tools and unit tests. It should be package private.
// which uses this method to do log splitting. * It is public only because TestWALObserver is in a different package,
* which uses this method to do log splitting.
* @return List of output files created by the split.
*/
@VisibleForTesting @VisibleForTesting
public static List<Path> split(Path walDir, Path logDir, Path oldLogDir, FileSystem walFS, public static List<Path> split(Path walDir, Path logDir, Path oldLogDir, FileSystem walFS,
Configuration conf, final WALFactory factory) throws IOException { Configuration conf, final WALFactory factory) throws IOException {
@ -234,7 +249,7 @@ public class WALSplitter {
} }
/** /**
* log splitting implementation, splits one log file. * WAL splitting implementation, splits one log file.
* @param logfile should be an actual log file. * @param logfile should be an actual log file.
*/ */
@VisibleForTesting @VisibleForTesting
@ -286,7 +301,8 @@ public class WALSplitter {
String encodedRegionNameAsStr = Bytes.toString(region); String encodedRegionNameAsStr = Bytes.toString(region);
lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr); lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
if (lastFlushedSequenceId == null) { if (lastFlushedSequenceId == null) {
if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(), encodedRegionNameAsStr))) { if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(),
encodedRegionNameAsStr))) {
// The region directory itself is not present in the FS. This indicates that // The region directory itself is not present in the FS. This indicates that
// the region/table is already removed. We can just skip all the edits for this // the region/table is already removed. We can just skip all the edits for this
// region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits // region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits

View File

@ -1,5 +1,4 @@
/* /*
*
* Licensed to the Apache Software Foundation (ASF) under one * Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file * or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information * distributed with this work for additional information
@ -19,13 +18,12 @@
package org.apache.hadoop.hbase.wal; package org.apache.hadoop.hbase.wal;
import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits; import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE; import static org.apache.hadoop.hbase.wal.WALSplitter.WAL_SPLIT_TO_HFILE;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
import java.io.IOException; import java.io.IOException;
import java.security.PrivilegedExceptionAction; import java.security.PrivilegedExceptionAction;
import java.util.ArrayList; import java.util.ArrayList;