HBASE-19358 Improve the stability of splitting log when do fail over
Signed-off-by: Yu Li <liyu@apache.org>
This commit is contained in:
parent
a66e923106
commit
1e352f6247
|
@ -18,11 +18,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.wal;
|
package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|
||||||
|
|
||||||
import java.io.EOFException;
|
import java.io.EOFException;
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -31,6 +26,7 @@ import java.text.ParseException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
|
@ -71,6 +67,7 @@ import org.apache.hadoop.hbase.client.Delete;
|
||||||
import org.apache.hadoop.hbase.client.Durability;
|
import org.apache.hadoop.hbase.client.Durability;
|
||||||
import org.apache.hadoop.hbase.client.Mutation;
|
import org.apache.hadoop.hbase.client.Mutation;
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
|
||||||
import org.apache.hadoop.hbase.io.HeapSize;
|
import org.apache.hadoop.hbase.io.HeapSize;
|
||||||
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
||||||
import org.apache.hadoop.hbase.master.SplitLogManager;
|
import org.apache.hadoop.hbase.master.SplitLogManager;
|
||||||
|
@ -80,12 +77,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
|
||||||
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
|
import org.apache.hadoop.hbase.regionserver.LastSequenceId;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
|
@ -101,6 +92,16 @@ import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
|
||||||
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
|
||||||
/**
|
/**
|
||||||
* This class is responsible for splitting up a bunch of regionserver commit log
|
* This class is responsible for splitting up a bunch of regionserver commit log
|
||||||
* files that are no longer being written to, into new files, one per region, for
|
* files that are no longer being written to, into new files, one per region, for
|
||||||
|
@ -140,6 +141,12 @@ public class WALSplitter {
|
||||||
// the file being split currently
|
// the file being split currently
|
||||||
private FileStatus fileBeingSplit;
|
private FileStatus fileBeingSplit;
|
||||||
|
|
||||||
|
// if we limit the number of writers opened for sinking recovered edits
|
||||||
|
private final boolean splitWriterCreationBounded;
|
||||||
|
|
||||||
|
public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
|
||||||
|
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
|
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
|
||||||
FileSystem fs, LastSequenceId idChecker,
|
FileSystem fs, LastSequenceId idChecker,
|
||||||
|
@ -156,11 +163,19 @@ public class WALSplitter {
|
||||||
this.walFactory = factory;
|
this.walFactory = factory;
|
||||||
PipelineController controller = new PipelineController();
|
PipelineController controller = new PipelineController();
|
||||||
|
|
||||||
|
this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
|
||||||
|
|
||||||
entryBuffers = new EntryBuffers(controller,
|
entryBuffers = new EntryBuffers(controller,
|
||||||
this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024));
|
this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024),
|
||||||
|
splitWriterCreationBounded);
|
||||||
|
|
||||||
int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
|
int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
|
||||||
outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
|
if(splitWriterCreationBounded){
|
||||||
|
outputSink = new BoundedLogWriterCreationOutputSink(
|
||||||
|
controller, entryBuffers, numWriterThreads);
|
||||||
|
}else {
|
||||||
|
outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -833,10 +848,17 @@ public class WALSplitter {
|
||||||
|
|
||||||
long totalBuffered = 0;
|
long totalBuffered = 0;
|
||||||
long maxHeapUsage;
|
long maxHeapUsage;
|
||||||
|
boolean splitWriterCreationBounded;
|
||||||
|
|
||||||
public EntryBuffers(PipelineController controller, long maxHeapUsage) {
|
public EntryBuffers(PipelineController controller, long maxHeapUsage) {
|
||||||
|
this(controller, maxHeapUsage, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
public EntryBuffers(PipelineController controller, long maxHeapUsage,
|
||||||
|
boolean splitWriterCreationBounded){
|
||||||
this.controller = controller;
|
this.controller = controller;
|
||||||
this.maxHeapUsage = maxHeapUsage;
|
this.maxHeapUsage = maxHeapUsage;
|
||||||
|
this.splitWriterCreationBounded = splitWriterCreationBounded;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -876,6 +898,13 @@ public class WALSplitter {
|
||||||
* @return RegionEntryBuffer a buffer of edits to be written.
|
* @return RegionEntryBuffer a buffer of edits to be written.
|
||||||
*/
|
*/
|
||||||
synchronized RegionEntryBuffer getChunkToWrite() {
|
synchronized RegionEntryBuffer getChunkToWrite() {
|
||||||
|
// The core part of limiting opening writers is it doesn't return chunk only if the
|
||||||
|
// heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
|
||||||
|
// region during splitting. It will flush all the logs in the buffer after splitting
|
||||||
|
// through a threadpool, which means the number of writers it created is under control.
|
||||||
|
if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
long biggestSize = 0;
|
long biggestSize = 0;
|
||||||
byte[] biggestBufferKey = null;
|
byte[] biggestBufferKey = null;
|
||||||
|
|
||||||
|
@ -1054,11 +1083,10 @@ public class WALSplitter {
|
||||||
protected PipelineController controller;
|
protected PipelineController controller;
|
||||||
protected EntryBuffers entryBuffers;
|
protected EntryBuffers entryBuffers;
|
||||||
|
|
||||||
protected Map<byte[], SinkWriter> writers = Collections
|
protected ConcurrentHashMap<String, SinkWriter> writers = new ConcurrentHashMap<>();
|
||||||
.synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
|
protected ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
|
||||||
|
new ConcurrentHashMap<>();
|
||||||
|
|
||||||
protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
|
|
||||||
.synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
|
|
||||||
|
|
||||||
protected final List<WriterThread> writerThreads = Lists.newArrayList();
|
protected final List<WriterThread> writerThreads = Lists.newArrayList();
|
||||||
|
|
||||||
|
@ -1105,11 +1133,10 @@ public class WALSplitter {
|
||||||
*/
|
*/
|
||||||
void updateRegionMaximumEditLogSeqNum(Entry entry) {
|
void updateRegionMaximumEditLogSeqNum(Entry entry) {
|
||||||
synchronized (regionMaximumEditLogSeqNum) {
|
synchronized (regionMaximumEditLogSeqNum) {
|
||||||
Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
|
String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
|
||||||
.getEncodedRegionName());
|
Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
|
||||||
if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
|
if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
|
||||||
regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
|
regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
|
||||||
.getSequenceId());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1266,87 +1293,24 @@ public class WALSplitter {
|
||||||
* Close all of the output streams.
|
* Close all of the output streams.
|
||||||
* @return the list of paths written.
|
* @return the list of paths written.
|
||||||
*/
|
*/
|
||||||
private List<Path> close() throws IOException {
|
List<Path> close() throws IOException {
|
||||||
Preconditions.checkState(!closeAndCleanCompleted);
|
Preconditions.checkState(!closeAndCleanCompleted);
|
||||||
|
|
||||||
final List<Path> paths = new ArrayList<>();
|
final List<Path> paths = new ArrayList<>();
|
||||||
final List<IOException> thrown = Lists.newArrayList();
|
final List<IOException> thrown = Lists.newArrayList();
|
||||||
ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L,
|
ThreadPoolExecutor closeThreadPool = Threads
|
||||||
TimeUnit.SECONDS, new ThreadFactory() {
|
.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
|
||||||
private int count = 1;
|
private int count = 1;
|
||||||
|
|
||||||
@Override
|
@Override public Thread newThread(Runnable r) {
|
||||||
public Thread newThread(Runnable r) {
|
Thread t = new Thread(r, "split-log-closeStream-" + count++);
|
||||||
Thread t = new Thread(r, "split-log-closeStream-" + count++);
|
return t;
|
||||||
return t;
|
}
|
||||||
}
|
});
|
||||||
});
|
|
||||||
CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
|
CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
|
||||||
for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
|
boolean progress_failed;
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
|
|
||||||
}
|
|
||||||
completionService.submit(new Callable<Void>() {
|
|
||||||
@Override
|
|
||||||
public Void call() throws Exception {
|
|
||||||
WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
|
|
||||||
if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p);
|
|
||||||
try {
|
|
||||||
wap.w.close();
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.error("Couldn't close log at " + wap.p, ioe);
|
|
||||||
thrown.add(ioe);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
|
|
||||||
+ " edits, skipped " + wap.editsSkipped + " edits in "
|
|
||||||
+ (wap.nanosSpent / 1000 / 1000) + "ms");
|
|
||||||
}
|
|
||||||
if (wap.editsWritten == 0) {
|
|
||||||
// just remove the empty recovered.edits file
|
|
||||||
if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
|
|
||||||
LOG.warn("Failed deleting empty " + wap.p);
|
|
||||||
throw new IOException("Failed deleting empty " + wap.p);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
Path dst = getCompletedRecoveredEditsFilePath(wap.p,
|
|
||||||
regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
|
|
||||||
try {
|
|
||||||
if (!dst.equals(wap.p) && fs.exists(dst)) {
|
|
||||||
deleteOneWithFewerEntries(wap, dst);
|
|
||||||
}
|
|
||||||
// Skip the unit tests which create a splitter that reads and
|
|
||||||
// writes the data without touching disk.
|
|
||||||
// TestHLogSplit#testThreading is an example.
|
|
||||||
if (fs.exists(wap.p)) {
|
|
||||||
if (!fs.rename(wap.p, dst)) {
|
|
||||||
throw new IOException("Failed renaming " + wap.p + " to " + dst);
|
|
||||||
}
|
|
||||||
LOG.info("Rename " + wap.p + " to " + dst);
|
|
||||||
}
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
|
|
||||||
thrown.add(ioe);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
paths.add(dst);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
|
|
||||||
boolean progress_failed = false;
|
|
||||||
try {
|
try {
|
||||||
for (int i = 0, n = this.writers.size(); i < n; i++) {
|
progress_failed = executeCloseTask(completionService, thrown, paths);
|
||||||
Future<Void> future = completionService.take();
|
|
||||||
future.get();
|
|
||||||
if (!progress_failed && reporter != null && !reporter.progress()) {
|
|
||||||
progress_failed = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
IOException iie = new InterruptedIOException();
|
IOException iie = new InterruptedIOException();
|
||||||
iie.initCause(e);
|
iie.initCause(e);
|
||||||
|
@ -1356,7 +1320,6 @@ public class WALSplitter {
|
||||||
} finally {
|
} finally {
|
||||||
closeThreadPool.shutdownNow();
|
closeThreadPool.shutdownNow();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!thrown.isEmpty()) {
|
if (!thrown.isEmpty()) {
|
||||||
throw MultipleIOException.createIOException(thrown);
|
throw MultipleIOException.createIOException(thrown);
|
||||||
}
|
}
|
||||||
|
@ -1368,6 +1331,88 @@ public class WALSplitter {
|
||||||
return paths;
|
return paths;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param completionService threadPool to execute the closing tasks
|
||||||
|
* @param thrown store the exceptions
|
||||||
|
* @param paths arrayList to store the paths written
|
||||||
|
* @return if close tasks executed successful
|
||||||
|
*/
|
||||||
|
boolean executeCloseTask(CompletionService<Void> completionService,
|
||||||
|
List<IOException> thrown, List<Path> paths)
|
||||||
|
throws InterruptedException, ExecutionException {
|
||||||
|
for (final Map.Entry<String, SinkWriter> writersEntry : writers.entrySet()) {
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p);
|
||||||
|
}
|
||||||
|
completionService.submit(new Callable<Void>() {
|
||||||
|
@Override public Void call() throws Exception {
|
||||||
|
WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
|
||||||
|
Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
|
||||||
|
paths.add(dst);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
boolean progress_failed = false;
|
||||||
|
for (int i = 0, n = this.writers.size(); i < n; i++) {
|
||||||
|
Future<Void> future = completionService.take();
|
||||||
|
future.get();
|
||||||
|
if (!progress_failed && reporter != null && !reporter.progress()) {
|
||||||
|
progress_failed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return progress_failed;
|
||||||
|
}
|
||||||
|
|
||||||
|
Path closeWriter(String encodedRegionName, WriterAndPath wap,
|
||||||
|
List<IOException> thrown) throws IOException{
|
||||||
|
if (LOG.isTraceEnabled()) {
|
||||||
|
LOG.trace("Closing " + wap.p);
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
wap.w.close();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Couldn't close log at " + wap.p, ioe);
|
||||||
|
thrown.add(ioe);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
|
||||||
|
+ " edits, skipped " + wap.editsSkipped + " edits in "
|
||||||
|
+ (wap.nanosSpent / 1000 / 1000) + "ms");
|
||||||
|
}
|
||||||
|
if (wap.editsWritten == 0) {
|
||||||
|
// just remove the empty recovered.edits file
|
||||||
|
if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
|
||||||
|
LOG.warn("Failed deleting empty " + wap.p);
|
||||||
|
throw new IOException("Failed deleting empty " + wap.p);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Path dst = getCompletedRecoveredEditsFilePath(wap.p,
|
||||||
|
regionMaximumEditLogSeqNum.get(encodedRegionName));
|
||||||
|
try {
|
||||||
|
if (!dst.equals(wap.p) && fs.exists(dst)) {
|
||||||
|
deleteOneWithFewerEntries(wap, dst);
|
||||||
|
}
|
||||||
|
// Skip the unit tests which create a splitter that reads and
|
||||||
|
// writes the data without touching disk.
|
||||||
|
// TestHLogSplit#testThreading is an example.
|
||||||
|
if (fs.exists(wap.p)) {
|
||||||
|
if (!fs.rename(wap.p, dst)) {
|
||||||
|
throw new IOException("Failed renaming " + wap.p + " to " + dst);
|
||||||
|
}
|
||||||
|
LOG.info("Rename " + wap.p + " to " + dst);
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
|
||||||
|
thrown.add(ioe);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return dst;
|
||||||
|
}
|
||||||
|
|
||||||
private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
|
private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
|
||||||
if (writersClosed) {
|
if (writersClosed) {
|
||||||
return thrown;
|
return thrown;
|
||||||
|
@ -1390,20 +1435,19 @@ public class WALSplitter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
synchronized (writers) {
|
WriterAndPath wap = null;
|
||||||
WriterAndPath wap = null;
|
for (SinkWriter tmpWAP : writers.values()) {
|
||||||
for (SinkWriter tmpWAP : writers.values()) {
|
try {
|
||||||
try {
|
wap = (WriterAndPath) tmpWAP;
|
||||||
wap = (WriterAndPath) tmpWAP;
|
wap.w.close();
|
||||||
wap.w.close();
|
} catch (IOException ioe) {
|
||||||
} catch (IOException ioe) {
|
LOG.error("Couldn't close log at " + wap.p, ioe);
|
||||||
LOG.error("Couldn't close log at {}", wap.p, ioe);
|
thrown.add(ioe);
|
||||||
thrown.add(ioe);
|
continue;
|
||||||
continue;
|
|
||||||
}
|
|
||||||
LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
|
|
||||||
+ (wap.nanosSpent / 1000 / 1000) + "ms)");
|
|
||||||
}
|
}
|
||||||
|
LOG.info(
|
||||||
|
"Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
|
||||||
|
/ 1000 / 1000) + "ms)");
|
||||||
}
|
}
|
||||||
writersClosed = true;
|
writersClosed = true;
|
||||||
}
|
}
|
||||||
|
@ -1416,9 +1460,10 @@ public class WALSplitter {
|
||||||
* long as multiple threads are always acting on different regions.
|
* long as multiple threads are always acting on different regions.
|
||||||
* @return null if this region shouldn't output any logs
|
* @return null if this region shouldn't output any logs
|
||||||
*/
|
*/
|
||||||
private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
|
WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException {
|
||||||
byte region[] = entry.getKey().getEncodedRegionName();
|
byte region[] = entry.getKey().getEncodedRegionName();
|
||||||
WriterAndPath ret = (WriterAndPath) writers.get(region);
|
String regionName = Bytes.toString(region);
|
||||||
|
WriterAndPath ret = (WriterAndPath) writers.get(regionName);
|
||||||
if (ret != null) {
|
if (ret != null) {
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -1432,14 +1477,16 @@ public class WALSplitter {
|
||||||
blacklistedRegions.add(region);
|
blacklistedRegions.add(region);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
writers.put(region, ret);
|
if(reusable) {
|
||||||
|
writers.put(regionName, ret);
|
||||||
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return a path with a write for that path. caller should close.
|
* @return a path with a write for that path. caller should close.
|
||||||
*/
|
*/
|
||||||
private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
|
WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
|
||||||
Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit.getPath().getName());
|
Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit.getPath().getName());
|
||||||
if (regionedits == null) {
|
if (regionedits == null) {
|
||||||
return null;
|
return null;
|
||||||
|
@ -1457,7 +1504,7 @@ public class WALSplitter {
|
||||||
return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
|
return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
|
||||||
}
|
}
|
||||||
|
|
||||||
private void filterCellByStore(Entry logEntry) {
|
void filterCellByStore(Entry logEntry) {
|
||||||
Map<byte[], Long> maxSeqIdInStores =
|
Map<byte[], Long> maxSeqIdInStores =
|
||||||
regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
|
regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
|
||||||
if (MapUtils.isEmpty(maxSeqIdInStores)) {
|
if (MapUtils.isEmpty(maxSeqIdInStores)) {
|
||||||
|
@ -1488,10 +1535,14 @@ public class WALSplitter {
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void append(RegionEntryBuffer buffer) throws IOException {
|
public void append(RegionEntryBuffer buffer) throws IOException {
|
||||||
|
appendBuffer(buffer, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{
|
||||||
List<Entry> entries = buffer.entryBuffer;
|
List<Entry> entries = buffer.entryBuffer;
|
||||||
if (entries.isEmpty()) {
|
if (entries.isEmpty()) {
|
||||||
LOG.warn("got an empty buffer, skipping");
|
LOG.warn("got an empty buffer, skipping");
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
WriterAndPath wap = null;
|
WriterAndPath wap = null;
|
||||||
|
@ -1502,14 +1553,14 @@ public class WALSplitter {
|
||||||
|
|
||||||
for (Entry logEntry : entries) {
|
for (Entry logEntry : entries) {
|
||||||
if (wap == null) {
|
if (wap == null) {
|
||||||
wap = getWriterAndPath(logEntry);
|
wap = getWriterAndPath(logEntry, reusable);
|
||||||
if (wap == null) {
|
if (wap == null) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
// This log spews the full edit. Can be massive in the log. Enable only debugging
|
// This log spews the full edit. Can be massive in the log. Enable only debugging
|
||||||
// WAL lost edit issues.
|
// WAL lost edit issues.
|
||||||
LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
|
LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
|
||||||
}
|
}
|
||||||
return;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
filterCellByStore(logEntry);
|
filterCellByStore(logEntry);
|
||||||
|
@ -1530,6 +1581,7 @@ public class WALSplitter {
|
||||||
LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
|
LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
|
return wap;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -1549,10 +1601,8 @@ public class WALSplitter {
|
||||||
@Override
|
@Override
|
||||||
public Map<byte[], Long> getOutputCounts() {
|
public Map<byte[], Long> getOutputCounts() {
|
||||||
TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
synchronized (writers) {
|
for (Map.Entry<String, SinkWriter> entry : writers.entrySet()) {
|
||||||
for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
|
ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
|
||||||
ret.put(entry.getKey(), entry.getValue().editsWritten);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
}
|
}
|
||||||
|
@ -1563,6 +1613,114 @@ public class WALSplitter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
|
||||||
|
|
||||||
|
private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public BoundedLogWriterCreationOutputSink(PipelineController controller,
|
||||||
|
EntryBuffers entryBuffers, int numWriters) {
|
||||||
|
super(controller, entryBuffers, numWriters);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Path> finishWritingAndClose() throws IOException {
|
||||||
|
boolean isSuccessful;
|
||||||
|
List<Path> result;
|
||||||
|
try {
|
||||||
|
isSuccessful = finishWriting(false);
|
||||||
|
} finally {
|
||||||
|
result = close();
|
||||||
|
}
|
||||||
|
if (isSuccessful) {
|
||||||
|
splits = result;
|
||||||
|
}
|
||||||
|
return splits;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
boolean executeCloseTask(CompletionService<Void> completionService,
|
||||||
|
List<IOException> thrown, List<Path> paths)
|
||||||
|
throws InterruptedException, ExecutionException {
|
||||||
|
for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) {
|
||||||
|
LOG.info("Submitting writeThenClose of " + buffer.getValue().encodedRegionName);
|
||||||
|
completionService.submit(new Callable<Void>() {
|
||||||
|
public Void call() throws Exception {
|
||||||
|
Path dst = writeThenClose(buffer.getValue());
|
||||||
|
paths.add(dst);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
boolean progress_failed = false;
|
||||||
|
for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
|
||||||
|
Future<Void> future = completionService.take();
|
||||||
|
future.get();
|
||||||
|
if (!progress_failed && reporter != null && !reporter.progress()) {
|
||||||
|
progress_failed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return progress_failed;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* since the splitting process may create multiple output files, we need a map
|
||||||
|
* regionRecoverStatMap to track the output count of each region.
|
||||||
|
* @return a map from encoded region ID to the number of edits written out for that region.
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public Map<byte[], Long> getOutputCounts() {
|
||||||
|
Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
|
||||||
|
for(Map.Entry<String, Long> entry: regionRecoverStatMap.entrySet()){
|
||||||
|
regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
|
||||||
|
}
|
||||||
|
return regionRecoverStatMapResult;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the number of recovered regions
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public int getNumberOfRecoveredRegions() {
|
||||||
|
return regionRecoverStatMap.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Append the buffer to a new recovered edits file, then close it after all done
|
||||||
|
* @param buffer contain all entries of a certain region
|
||||||
|
* @throws IOException when closeWriter failed
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void append(RegionEntryBuffer buffer) throws IOException {
|
||||||
|
writeThenClose(buffer);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Path writeThenClose(RegionEntryBuffer buffer) throws IOException {
|
||||||
|
WriterAndPath wap = appendBuffer(buffer, false);
|
||||||
|
if(wap != null) {
|
||||||
|
String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
|
||||||
|
Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
|
||||||
|
if (value != null) {
|
||||||
|
Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
|
||||||
|
regionRecoverStatMap.put(encodedRegionName, newValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Path dst = null;
|
||||||
|
List<IOException> thrown = new ArrayList<>();
|
||||||
|
if(wap != null){
|
||||||
|
dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
|
||||||
|
}
|
||||||
|
if (!thrown.isEmpty()) {
|
||||||
|
throw MultipleIOException.createIOException(thrown);
|
||||||
|
}
|
||||||
|
return dst;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class wraps the actual writer which writes data out and related statistics
|
* Class wraps the actual writer which writes data out and related statistics
|
||||||
*/
|
*/
|
||||||
|
|
|
@ -0,0 +1,35 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.regionserver.wal;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||||
|
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
|
||||||
|
@Category(MediumTests.class)
|
||||||
|
public class TestWALReplayBoundedLogWriterCreation extends TestWALReplay {
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
TestWALReplay.setUpBeforeClass();
|
||||||
|
TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -0,0 +1,44 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
|
@Category(LargeTests.class)
|
||||||
|
public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit{
|
||||||
|
@BeforeClass
|
||||||
|
public static void setUpBeforeClass() throws Exception {
|
||||||
|
TestWALSplit.setUpBeforeClass();
|
||||||
|
TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The logic of this test has conflict with the limit writers split logic, skip this test
|
||||||
|
*/
|
||||||
|
@Test(timeout=300000)
|
||||||
|
@Ignore
|
||||||
|
public void testThreadingSlowWriterSmallBuffer() throws Exception {
|
||||||
|
super.testThreadingSlowWriterSmallBuffer();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue