HBASE-24577 Doc WALSplitter classes (#1913)
Signed-off-by: Anoop Sam John <anoopsamjohn@apache.org> Signed-off-by: Viraj Jasani <vjasani@apache.org> Signed-off-by: Guanghao Zhang <zghao@apache.org>
This commit is contained in:
parent
e202c98fb0
commit
812d1e2bb5
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -18,7 +18,6 @@
|
|||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import static org.apache.hadoop.hbase.TableName.META_TABLE_NAME;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
import java.util.HashMap;
|
||||
|
@ -29,7 +28,6 @@ import java.util.concurrent.ConcurrentMap;
|
|||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.Cell;
|
||||
import org.apache.hadoop.hbase.CellComparatorImpl;
|
||||
|
@ -51,13 +49,16 @@ import org.apache.yetus.audience.InterfaceAudience;
|
|||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
/**
|
||||
* A WALSplitter sink that outputs {@link org.apache.hadoop.hbase.io.hfile.HFile}s.
|
||||
* Runs with a bounded number of HFile writers at any one time rather than let the count run up.
|
||||
* @see BoundedRecoveredEditsOutputSink for a sink implementation that writes intermediate
|
||||
* recovered.edits files.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class BoundedRecoveredHFilesOutputSink extends OutputSink {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(BoundedRecoveredHFilesOutputSink.class);
|
||||
|
||||
public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile";
|
||||
public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
|
||||
|
||||
private final WALSplitter walSplitter;
|
||||
|
||||
// Since the splitting process may create multiple output files, we need a map
|
||||
|
@ -80,6 +81,8 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
|
|||
Map<String, CellSet> familyCells = new HashMap<>();
|
||||
Map<String, Long> familySeqIds = new HashMap<>();
|
||||
boolean isMetaTable = buffer.tableName.equals(META_TABLE_NAME);
|
||||
// First iterate all Cells to find which column families are present and to stamp Cell with
|
||||
// sequence id.
|
||||
for (WAL.Entry entry : buffer.entries) {
|
||||
long seqId = entry.getKey().getSequenceId();
|
||||
List<Cell> cells = entry.getEdit().getCells();
|
||||
|
@ -99,12 +102,13 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
|
|||
}
|
||||
}
|
||||
|
||||
// The key point is create a new writer for each column family, write edits then close writer.
|
||||
// Create a new hfile writer for each column family, write edits then close writer.
|
||||
String regionName = Bytes.toString(buffer.encodedRegionName);
|
||||
for (Map.Entry<String, CellSet> cellsEntry : familyCells.entrySet()) {
|
||||
String familyName = cellsEntry.getKey();
|
||||
StoreFileWriter writer = createRecoveredHFileWriter(buffer.tableName, regionName,
|
||||
familySeqIds.get(familyName), familyName, isMetaTable);
|
||||
LOG.trace("Created {}", writer.getPath());
|
||||
openingWritersNum.incrementAndGet();
|
||||
try {
|
||||
for (Cell cell : cellsEntry.getValue()) {
|
||||
|
@ -118,6 +122,7 @@ public class BoundedRecoveredHFilesOutputSink extends OutputSink {
|
|||
openingWritersNum.decrementAndGet();
|
||||
} finally {
|
||||
writer.close();
|
||||
LOG.trace("Closed {}, edits={}", writer.getPath(), familyCells.size());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -56,6 +56,9 @@ public abstract class OutputSink {
|
|||
|
||||
protected final AtomicLong totalSkippedEdits = new AtomicLong();
|
||||
|
||||
/**
|
||||
* List of all the files produced by this sink
|
||||
*/
|
||||
protected final List<Path> splits = new ArrayList<>();
|
||||
|
||||
/**
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentHashMap;
|
|||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.Future;
|
||||
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.hbase.TableName;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
|
@ -34,12 +33,15 @@ import org.apache.hadoop.io.MultipleIOException;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* Class that manages the output streams from the log splitting process.
|
||||
* Every region only has one recovered edits.
|
||||
* Every region only has one recovered edits file PER split WAL (if we split
|
||||
* multiple WALs during a log-splitting session, on open, a Region may
|
||||
* have multiple recovered.edits files to replay -- one per split WAL).
|
||||
* @see BoundedRecoveredEditsOutputSink which is like this class but imposes upper bound on
|
||||
* the number of writers active at one time (makes for better throughput).
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
|
||||
|
@ -81,6 +83,7 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
|
|||
if (ret == null) {
|
||||
return null;
|
||||
}
|
||||
LOG.trace("Created {}", ret.path);
|
||||
writers.put(Bytes.toString(region), ret);
|
||||
return ret;
|
||||
}
|
||||
|
@ -106,6 +109,7 @@ class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
|
|||
for (RecoveredEditsWriter writer : writers.values()) {
|
||||
closeCompletionService.submit(() -> {
|
||||
Path dst = closeRecoveredEditsWriter(writer, thrown);
|
||||
LOG.trace("Closed {}", dst);
|
||||
splits.add(dst);
|
||||
return null;
|
||||
});
|
||||
|
|
|
@ -1,4 +1,4 @@
|
|||
/**
|
||||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -17,10 +17,7 @@
|
|||
*/
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.DEFAULT_WAL_SPLIT_TO_HFILE;
|
||||
import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
|
||||
import static org.apache.hadoop.hbase.wal.WALSplitUtil.finishSplitLogFile;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
|
@ -63,17 +60,19 @@ import org.apache.hadoop.ipc.RemoteException;
|
|||
import org.apache.yetus.audience.InterfaceAudience;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
||||
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
|
||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
|
||||
|
||||
/**
|
||||
* This class is responsible for splitting up a bunch of regionserver commit log
|
||||
* files that are no longer being written to, into new files, one per region, for
|
||||
* recovering data on startup. Delete the old log files when finished.
|
||||
* Split RegionServer WAL files. Splits the WAL into new files,
|
||||
* one per region, to be picked up on Region reopen. Deletes the split WAL when finished.
|
||||
* See {@link #split(Path, Path, Path, FileSystem, Configuration, WALFactory)} or
|
||||
* {@link #splitLogFile(Path, FileStatus, FileSystem, Configuration, CancelableProgressable,
|
||||
* LastSequenceId, SplitLogWorkerCoordination, WALFactory, RegionServerServices)} for
|
||||
* entry-point.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class WALSplitter {
|
||||
|
@ -96,7 +95,12 @@ public class WALSplitter {
|
|||
OutputSink outputSink;
|
||||
private EntryBuffers entryBuffers;
|
||||
|
||||
/**
|
||||
* Coordinator for split log. Used by the zk-based log splitter.
|
||||
* Not used by the procedure v2-based log splitter.
|
||||
*/
|
||||
private SplitLogWorkerCoordination splitLogWorkerCoordination;
|
||||
|
||||
private final WALFactory walFactory;
|
||||
|
||||
private MonitoredTask status;
|
||||
|
@ -115,7 +119,20 @@ public class WALSplitter {
|
|||
|
||||
private final String tmpDirName;
|
||||
|
||||
/**
|
||||
* Split WAL directly to hfiles instead of into intermediary 'recovered.edits' files.
|
||||
*/
|
||||
public static final String WAL_SPLIT_TO_HFILE = "hbase.wal.split.to.hfile";
|
||||
public static final boolean DEFAULT_WAL_SPLIT_TO_HFILE = false;
|
||||
|
||||
/**
|
||||
* True if we are to run with bounded amount of writers rather than let the count blossom.
|
||||
* Default is 'false'. Does not apply if you have set 'hbase.wal.split.to.hfile' as that
|
||||
* is always bounded. Only applies when you are doing recovery to 'recovered.edits'
|
||||
* files (the old default). Bounded writing tends to have higher throughput.
|
||||
*/
|
||||
public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
|
||||
|
||||
public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize";
|
||||
public final static String SPLIT_WAL_WRITER_THREADS =
|
||||
"hbase.regionserver.hlog.splitlog.writer.threads";
|
||||
|
@ -184,11 +201,7 @@ public class WALSplitter {
|
|||
}
|
||||
|
||||
/**
|
||||
* Splits a WAL file into region's recovered-edits directory.
|
||||
* This is the main entry point for distributed log splitting from SplitLogWorker.
|
||||
* <p>
|
||||
* If the log file has N regions then N recovered.edits files will be produced.
|
||||
* <p>
|
||||
* Splits a WAL file.
|
||||
* @return false if it is interrupted by the progress-able.
|
||||
*/
|
||||
public static boolean splitLogFile(Path walDir, FileStatus logfile, FileSystem walFS,
|
||||
|
@ -202,10 +215,13 @@ public class WALSplitter {
|
|||
return s.splitLogFile(logfile, reporter);
|
||||
}
|
||||
|
||||
// A wrapper to split one log folder using the method used by distributed
|
||||
// log splitting. Used by tools and unit tests. It should be package private.
|
||||
// It is public only because TestWALObserver is in a different package,
|
||||
// which uses this method to do log splitting.
|
||||
/**
|
||||
* Split a folder of WAL files. Delete the directory when done.
|
||||
* Used by tools and unit tests. It should be package private.
|
||||
* It is public only because TestWALObserver is in a different package,
|
||||
* which uses this method to do log splitting.
|
||||
* @return List of output files created by the split.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
public static List<Path> split(Path walDir, Path logDir, Path oldLogDir, FileSystem walFS,
|
||||
Configuration conf, final WALFactory factory) throws IOException {
|
||||
|
@ -233,7 +249,7 @@ public class WALSplitter {
|
|||
}
|
||||
|
||||
/**
|
||||
* log splitting implementation, splits one log file.
|
||||
* WAL splitting implementation, splits one log file.
|
||||
* @param logfile should be an actual log file.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
|
@ -285,7 +301,8 @@ public class WALSplitter {
|
|||
String encodedRegionNameAsStr = Bytes.toString(region);
|
||||
lastFlushedSequenceId = lastFlushedSequenceIds.get(encodedRegionNameAsStr);
|
||||
if (lastFlushedSequenceId == null) {
|
||||
if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(), encodedRegionNameAsStr))) {
|
||||
if (!(isRegionDirPresentUnderRoot(entry.getKey().getTableName(),
|
||||
encodedRegionNameAsStr))) {
|
||||
// The region directory itself is not present in the FS. This indicates that
|
||||
// the region/table is already removed. We can just skip all the edits for this
|
||||
// region. Setting lastFlushedSequenceId as Long.MAX_VALUE so that all edits
|
||||
|
|
|
@ -1,5 +1,4 @@
|
|||
/*
|
||||
*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -19,13 +18,12 @@
|
|||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import static org.apache.hadoop.hbase.regionserver.wal.AbstractTestWALReplay.addRegionEdits;
|
||||
import static org.apache.hadoop.hbase.wal.BoundedRecoveredHFilesOutputSink.WAL_SPLIT_TO_HFILE;
|
||||
import static org.apache.hadoop.hbase.wal.WALSplitter.WAL_SPLIT_TO_HFILE;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedExceptionAction;
|
||||
import java.util.ArrayList;
|
||||
|
|
Loading…
Reference in New Issue