HBASE-19358 Improve the stability of splitting log when do fail over
Signed-off-by: Yu Li <liyu@apache.org>
This commit is contained in:
parent
e866e837fb
commit
6faed49ad1
|
@ -25,6 +25,7 @@ import java.io.InterruptedIOException;
|
|||
import java.text.ParseException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
|
@ -163,6 +164,9 @@ public class WALSplitter {
|
|||
|
||||
protected boolean distributedLogReplay;
|
||||
|
||||
private final boolean splitWriterCreationBounded;
|
||||
|
||||
|
||||
// Map encodedRegionName -> lastFlushedSequenceId
|
||||
protected Map<String, Long> lastFlushedSequenceIds = new ConcurrentHashMap<String, Long>();
|
||||
|
||||
|
@ -182,6 +186,8 @@ public class WALSplitter {
|
|||
// the file being split currently
|
||||
private FileStatus fileBeingSplit;
|
||||
|
||||
public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
|
||||
|
||||
@VisibleForTesting
|
||||
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
|
||||
FileSystem fs, LastSequenceId idChecker,
|
||||
|
@ -196,10 +202,10 @@ public class WALSplitter {
|
|||
this.csm = (BaseCoordinatedStateManager)csm;
|
||||
this.walFactory = factory;
|
||||
this.controller = new PipelineController();
|
||||
|
||||
this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
|
||||
entryBuffers = new EntryBuffers(controller,
|
||||
this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize",
|
||||
128*1024*1024));
|
||||
128*1024*1024), splitWriterCreationBounded);
|
||||
|
||||
// a larger minBatchSize may slow down recovery because replay writer has to wait for
|
||||
// enough edits before replaying them
|
||||
|
@ -214,7 +220,12 @@ public class WALSplitter {
|
|||
LOG.info("ZooKeeperWatcher is passed in as NULL so disable distrubitedLogRepaly.");
|
||||
}
|
||||
this.distributedLogReplay = false;
|
||||
outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
|
||||
if(splitWriterCreationBounded){
|
||||
outputSink = new BoundedLogWriterCreationOutputSink(controller,
|
||||
entryBuffers, numWriterThreads);
|
||||
}else {
|
||||
outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -925,11 +936,19 @@ public class WALSplitter {
|
|||
Set<byte[]> currentlyWriting = new TreeSet<byte[]>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
long totalBuffered = 0;
|
||||
long maxHeapUsage;
|
||||
final long maxHeapUsage;
|
||||
boolean splitWriterCreationBounded;
|
||||
|
||||
|
||||
public EntryBuffers(PipelineController controller, long maxHeapUsage) {
|
||||
this(controller, maxHeapUsage, false);
|
||||
}
|
||||
|
||||
public EntryBuffers(PipelineController controller, long maxHeapUsage,
|
||||
boolean splitWriterCreationBounded) {
|
||||
this.controller = controller;
|
||||
this.maxHeapUsage = maxHeapUsage;
|
||||
this.splitWriterCreationBounded = splitWriterCreationBounded;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -969,6 +988,14 @@ public class WALSplitter {
|
|||
* @return RegionEntryBuffer a buffer of edits to be written or replayed.
|
||||
*/
|
||||
synchronized RegionEntryBuffer getChunkToWrite() {
|
||||
// The core part of limiting opening writers is it doesn't return chunk only if the heap size
|
||||
// is over maxHeapUsage. Thus it doesn't need to create a writer for each region
|
||||
// during splitting. It will flush all the logs in the buffer after splitting through a
|
||||
// threadpool, which means the number of writers it created is under control
|
||||
if(splitWriterCreationBounded && totalBuffered < maxHeapUsage){
|
||||
return null;
|
||||
}
|
||||
|
||||
long biggestSize = 0;
|
||||
byte[] biggestBufferKey = null;
|
||||
|
||||
|
@ -1147,11 +1174,9 @@ public class WALSplitter {
|
|||
protected PipelineController controller;
|
||||
protected EntryBuffers entryBuffers;
|
||||
|
||||
protected final Map<byte[], SinkWriter> writers = Collections
|
||||
.synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));;
|
||||
|
||||
protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
|
||||
.synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
|
||||
protected ConcurrentHashMap<String, SinkWriter> writers = new ConcurrentHashMap<>();
|
||||
protected ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
protected final List<WriterThread> writerThreads = Lists.newArrayList();
|
||||
|
||||
|
@ -1197,18 +1222,19 @@ public class WALSplitter {
|
|||
* Update region's maximum edit log SeqNum.
|
||||
*/
|
||||
void updateRegionMaximumEditLogSeqNum(Entry entry) {
|
||||
|
||||
synchronized (regionMaximumEditLogSeqNum) {
|
||||
Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey()
|
||||
.getEncodedRegionName());
|
||||
String encodedRegionName = Bytes.toString(entry.getKey().getEncodedRegionName());
|
||||
Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(encodedRegionName);
|
||||
if (currentMaxSeqNum == null || entry.getKey().getLogSeqNum() > currentMaxSeqNum) {
|
||||
regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey()
|
||||
regionMaximumEditLogSeqNum.put(encodedRegionName, entry.getKey()
|
||||
.getLogSeqNum());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Long getRegionMaximumEditLogSeqNum(byte[] region) {
|
||||
return regionMaximumEditLogSeqNum.get(region);
|
||||
return regionMaximumEditLogSeqNum.get(Bytes.toString(region));
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1330,7 +1356,7 @@ public class WALSplitter {
|
|||
}
|
||||
|
||||
// delete the one with fewer wal entries
|
||||
private void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException {
|
||||
void deleteOneWithFewerEntries(WriterAndPath wap, Path dst) throws IOException {
|
||||
long dstMinLogSeqNum = -1L;
|
||||
try (WAL.Reader reader = walFactory.createReader(fs, dst)) {
|
||||
WAL.Entry entry = reader.next();
|
||||
|
@ -1366,7 +1392,7 @@ public class WALSplitter {
|
|||
* Close all of the output streams.
|
||||
* @return the list of paths written.
|
||||
*/
|
||||
private List<Path> close() throws IOException {
|
||||
List<Path> close() throws IOException {
|
||||
Preconditions.checkState(!closeAndCleanCompleted);
|
||||
|
||||
final List<Path> paths = new ArrayList<Path>();
|
||||
|
@ -1383,71 +1409,9 @@ public class WALSplitter {
|
|||
});
|
||||
CompletionService<Void> completionService =
|
||||
new ExecutorCompletionService<Void>(closeThreadPool);
|
||||
for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p);
|
||||
}
|
||||
completionService.submit(new Callable<Void>() {
|
||||
@Override
|
||||
public Void call() throws Exception {
|
||||
WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
|
||||
if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p);
|
||||
try {
|
||||
wap.w.close();
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Couldn't close log at " + wap.p, ioe);
|
||||
thrown.add(ioe);
|
||||
return null;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
|
||||
+ " edits, skipped " + wap.editsSkipped + " edits in "
|
||||
+ (wap.nanosSpent / 1000 / 1000) + "ms");
|
||||
}
|
||||
if (wap.editsWritten == 0) {
|
||||
// just remove the empty recovered.edits file
|
||||
if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
|
||||
LOG.warn("Failed deleting empty " + wap.p);
|
||||
throw new IOException("Failed deleting empty " + wap.p);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
Path dst = getCompletedRecoveredEditsFilePath(wap.p,
|
||||
regionMaximumEditLogSeqNum.get(writersEntry.getKey()));
|
||||
try {
|
||||
if (!dst.equals(wap.p) && fs.exists(dst)) {
|
||||
deleteOneWithFewerEntries(wap, dst);
|
||||
}
|
||||
// Skip the unit tests which create a splitter that reads and
|
||||
// writes the data without touching disk.
|
||||
// TestHLogSplit#testThreading is an example.
|
||||
if (fs.exists(wap.p)) {
|
||||
if (!fs.rename(wap.p, dst)) {
|
||||
throw new IOException("Failed renaming " + wap.p + " to " + dst);
|
||||
}
|
||||
LOG.info("Rename " + wap.p + " to " + dst);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
|
||||
thrown.add(ioe);
|
||||
return null;
|
||||
}
|
||||
paths.add(dst);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
boolean progress_failed = false;
|
||||
try {
|
||||
for (int i = 0, n = this.writers.size(); i < n; i++) {
|
||||
Future<Void> future = completionService.take();
|
||||
future.get();
|
||||
if (!progress_failed && reporter != null && !reporter.progress()) {
|
||||
progress_failed = true;
|
||||
}
|
||||
}
|
||||
boolean progress_failed;
|
||||
try{
|
||||
progress_failed = executeCloseTask(completionService, thrown, paths);
|
||||
} catch (InterruptedException e) {
|
||||
IOException iie = new InterruptedIOException();
|
||||
iie.initCause(e);
|
||||
|
@ -1469,6 +1433,83 @@ public class WALSplitter {
|
|||
return paths;
|
||||
}
|
||||
|
||||
boolean executeCloseTask(CompletionService<Void> completionService,
|
||||
final List<IOException> thrown, final List<Path> paths)
|
||||
throws InterruptedException, ExecutionException {
|
||||
for (final Map.Entry<String, SinkWriter> writersEntry : writers.entrySet()) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p);
|
||||
}
|
||||
completionService.submit(new Callable<Void>() {
|
||||
@Override public Void call() throws Exception {
|
||||
WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
|
||||
Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
|
||||
paths.add(dst);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
boolean progress_failed = false;
|
||||
for (int i = 0, n = this.writers.size(); i < n; i++) {
|
||||
Future<Void> future = completionService.take();
|
||||
future.get();
|
||||
if (!progress_failed && reporter != null && !reporter.progress()) {
|
||||
progress_failed = true;
|
||||
}
|
||||
}
|
||||
return progress_failed;
|
||||
}
|
||||
|
||||
Path closeWriter(String encodedRegionName, WriterAndPath wap, List<IOException> thrown)
|
||||
throws IOException {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Closing " + wap.p);
|
||||
}
|
||||
try {
|
||||
wap.w.close();
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Couldn't close log at " + wap.p, ioe);
|
||||
thrown.add(ioe);
|
||||
return null;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten
|
||||
+ " edits, skipped " + wap.editsSkipped + " edits in "
|
||||
+ (wap.nanosSpent / 1000 / 1000) + "ms");
|
||||
}
|
||||
if (wap.editsWritten == 0) {
|
||||
// just remove the empty recovered.edits file
|
||||
if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
|
||||
LOG.warn("Failed deleting empty " + wap.p);
|
||||
throw new IOException("Failed deleting empty " + wap.p);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
Path dst = getCompletedRecoveredEditsFilePath(wap.p,
|
||||
regionMaximumEditLogSeqNum.get(encodedRegionName));
|
||||
try {
|
||||
if (!dst.equals(wap.p) && fs.exists(dst)) {
|
||||
deleteOneWithFewerEntries(wap, dst);
|
||||
}
|
||||
// Skip the unit tests which create a splitter that reads and
|
||||
// writes the data without touching disk.
|
||||
// TestHLogSplit#testThreading is an example.
|
||||
if (fs.exists(wap.p)) {
|
||||
if (!fs.rename(wap.p, dst)) {
|
||||
throw new IOException("Failed renaming " + wap.p + " to " + dst);
|
||||
}
|
||||
LOG.info("Rename " + wap.p + " to " + dst);
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
|
||||
thrown.add(ioe);
|
||||
return null;
|
||||
}
|
||||
return dst;
|
||||
}
|
||||
|
||||
|
||||
private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
|
||||
if (writersClosed) {
|
||||
return thrown;
|
||||
|
@ -1492,20 +1533,19 @@ public class WALSplitter {
|
|||
}
|
||||
}
|
||||
} finally {
|
||||
synchronized (writers) {
|
||||
WriterAndPath wap = null;
|
||||
for (SinkWriter tmpWAP : writers.values()) {
|
||||
try {
|
||||
wap = (WriterAndPath) tmpWAP;
|
||||
wap.w.close();
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Couldn't close log at " + wap.p, ioe);
|
||||
thrown.add(ioe);
|
||||
continue;
|
||||
}
|
||||
LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in "
|
||||
+ (wap.nanosSpent / 1000 / 1000) + "ms)");
|
||||
WriterAndPath wap = null;
|
||||
for (SinkWriter tmpWAP : writers.values()) {
|
||||
try {
|
||||
wap = (WriterAndPath) tmpWAP;
|
||||
wap.w.close();
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Couldn't close log at " + wap.p, ioe);
|
||||
thrown.add(ioe);
|
||||
continue;
|
||||
}
|
||||
LOG.info(
|
||||
"Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
|
||||
/ 1000 / 1000) + "ms)");
|
||||
}
|
||||
writersClosed = true;
|
||||
}
|
||||
|
@ -1518,9 +1558,10 @@ public class WALSplitter {
|
|||
* long as multiple threads are always acting on different regions.
|
||||
* @return null if this region shouldn't output any logs
|
||||
*/
|
||||
private WriterAndPath getWriterAndPath(Entry entry) throws IOException {
|
||||
WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException {
|
||||
byte region[] = entry.getKey().getEncodedRegionName();
|
||||
WriterAndPath ret = (WriterAndPath) writers.get(region);
|
||||
String regionName = Bytes.toString(region);
|
||||
WriterAndPath ret = (WriterAndPath) writers.get(regionName);
|
||||
if (ret != null) {
|
||||
return ret;
|
||||
}
|
||||
|
@ -1534,14 +1575,17 @@ public class WALSplitter {
|
|||
blacklistedRegions.add(region);
|
||||
return null;
|
||||
}
|
||||
writers.put(region, ret);
|
||||
|
||||
if(reusable) {
|
||||
writers.put(regionName, ret);
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* @return a path with a write for that path. caller should close.
|
||||
*/
|
||||
private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
|
||||
WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
|
||||
Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir,
|
||||
fileBeingSplit.getPath().getName());
|
||||
if (regionedits == null) {
|
||||
|
@ -1560,7 +1604,7 @@ public class WALSplitter {
|
|||
return new WriterAndPath(regionedits, w, entry.getKey().getLogSeqNum());
|
||||
}
|
||||
|
||||
private void filterCellByStore(Entry logEntry) {
|
||||
void filterCellByStore(Entry logEntry) {
|
||||
Map<byte[], Long> maxSeqIdInStores =
|
||||
regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
|
||||
if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
|
||||
|
@ -1591,12 +1635,16 @@ public class WALSplitter {
|
|||
|
||||
@Override
|
||||
public void append(RegionEntryBuffer buffer) throws IOException {
|
||||
appendBuffer(buffer, true);
|
||||
}
|
||||
|
||||
|
||||
WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{
|
||||
List<Entry> entries = buffer.entryBuffer;
|
||||
if (entries.isEmpty()) {
|
||||
LOG.warn("got an empty buffer, skipping");
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
|
||||
WriterAndPath wap = null;
|
||||
|
||||
long startTime = System.nanoTime();
|
||||
|
@ -1605,12 +1653,12 @@ public class WALSplitter {
|
|||
|
||||
for (Entry logEntry : entries) {
|
||||
if (wap == null) {
|
||||
wap = getWriterAndPath(logEntry);
|
||||
wap = getWriterAndPath(logEntry, reusable);
|
||||
if (wap == null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("getWriterAndPath decided we don't need to write edits for " + logEntry);
|
||||
}
|
||||
return;
|
||||
return null;
|
||||
}
|
||||
}
|
||||
filterCellByStore(logEntry);
|
||||
|
@ -1630,6 +1678,7 @@ public class WALSplitter {
|
|||
LOG.fatal(" Got while writing log entry to log", e);
|
||||
throw e;
|
||||
}
|
||||
return wap;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -1650,8 +1699,8 @@ public class WALSplitter {
|
|||
public Map<byte[], Long> getOutputCounts() {
|
||||
TreeMap<byte[], Long> ret = new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR);
|
||||
synchronized (writers) {
|
||||
for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) {
|
||||
ret.put(entry.getKey(), entry.getValue().editsWritten);
|
||||
for (Map.Entry<String, SinkWriter> entry : writers.entrySet()) {
|
||||
ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
|
@ -1663,6 +1712,105 @@ public class WALSplitter {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Class that will limit the number of hdfs writers we create to split the logs
|
||||
*/
|
||||
class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink{
|
||||
|
||||
ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
|
||||
|
||||
|
||||
public BoundedLogWriterCreationOutputSink(PipelineController controller,
|
||||
EntryBuffers entryBuffers, int numWriters){
|
||||
super(controller, entryBuffers, numWriters);
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<Path> finishWritingAndClose() throws IOException {
|
||||
boolean isSuccessful;
|
||||
List<Path> result;
|
||||
try {
|
||||
isSuccessful = finishWriting(false);
|
||||
} finally {
|
||||
result = close();
|
||||
}
|
||||
if (isSuccessful) {
|
||||
splits = result;
|
||||
}
|
||||
return splits;
|
||||
}
|
||||
|
||||
@Override
|
||||
boolean executeCloseTask(CompletionService<Void> closeCompletionService,
|
||||
final List<IOException> thrown, final List<Path> paths)
|
||||
throws InterruptedException, ExecutionException {
|
||||
for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) {
|
||||
LOG.info("Submitting write then close of " +
|
||||
Bytes.toString(buffer.getValue().encodedRegionName));
|
||||
closeCompletionService.submit(new Callable<Void>() {
|
||||
public Void call() throws Exception {
|
||||
Path dst = writeThenClose(buffer.getValue());
|
||||
paths.add(dst);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
boolean progress_failed = false;
|
||||
for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
|
||||
Future<Void> future = closeCompletionService.take();
|
||||
future.get();
|
||||
if (!progress_failed && reporter != null && !reporter.progress()) {
|
||||
progress_failed = true;
|
||||
}
|
||||
}
|
||||
return progress_failed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public Map<byte[], Long> getOutputCounts() {
|
||||
Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
|
||||
for(Map.Entry<String, Long> entry: regionRecoverStatMap.entrySet()){
|
||||
regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
|
||||
}
|
||||
return regionRecoverStatMapResult;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int getNumberOfRecoveredRegions() {
|
||||
return regionRecoverStatMap.size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void append(RegionEntryBuffer buffer) throws IOException {
|
||||
writeThenClose(buffer);
|
||||
}
|
||||
|
||||
private Path writeThenClose(RegionEntryBuffer buffer) throws IOException {
|
||||
WriterAndPath wap = appendBuffer(buffer, false);
|
||||
Path dst = null;
|
||||
if(wap != null){
|
||||
String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
|
||||
Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
|
||||
if(value != null){
|
||||
Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
|
||||
regionRecoverStatMap.put(encodedRegionName, newValue);
|
||||
}
|
||||
}
|
||||
|
||||
List<IOException> thrown = new ArrayList<>();
|
||||
if(wap != null) {
|
||||
dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
|
||||
}
|
||||
|
||||
if(!thrown.isEmpty()){
|
||||
throw MultipleIOException.createIOException(thrown);
|
||||
}
|
||||
return dst;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Class wraps the actual writer which writes data out and related statistics
|
||||
*/
|
||||
|
@ -1722,7 +1870,7 @@ public class WALSplitter {
|
|||
.synchronizedMap(new TreeMap<TableName, HConnection>());
|
||||
/**
|
||||
* Map key -> value layout
|
||||
* <servername>:<table name> -> Queue<Row>
|
||||
* servername:table name -> Queue(Row)
|
||||
*/
|
||||
private final Map<String, List<Pair<HRegionLocation, Entry>>> serverToBufferQueueMap =
|
||||
new ConcurrentHashMap<String, List<Pair<HRegionLocation, Entry>>>();
|
||||
|
|
|
@ -0,0 +1,33 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.regionserver.wal;
|
||||
|
||||
import org.apache.hadoop.hbase.testclassification.MediumTests;
|
||||
import org.apache.hadoop.hbase.wal.WALSplitter;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(MediumTests.class)
|
||||
public class TestWALReplayBoundedLogWriterCreation extends TestWALReplay {
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TestWALReplay.setUpBeforeClass();
|
||||
TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true);
|
||||
}
|
||||
}
|
|
@ -0,0 +1,44 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.wal;
|
||||
|
||||
import org.apache.hadoop.hbase.testclassification.LargeTests;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
|
||||
@Category(LargeTests.class)
|
||||
public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit{
|
||||
@BeforeClass
|
||||
public static void setUpBeforeClass() throws Exception {
|
||||
TestWALSplit.setUpBeforeClass();
|
||||
TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* The logic of this test is conflict with the limit writers split logic, skip this test
|
||||
*/
|
||||
@Test(timeout=300000)
|
||||
@Ignore
|
||||
public void testThreadingSlowWriterSmallBuffer() throws Exception {
|
||||
super.testThreadingSlowWriterSmallBuffer();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue