HBASE-19358 Improve the stability of splitting log when do fail over

Signed-off-by: Yu Li <liyu@apache.org>
This commit is contained in:
Jingyun Tian 2018-01-02 19:17:34 +08:00 committed by Yu Li
parent 4e9f4abb14
commit f6f57d38f7
3 changed files with 358 additions and 121 deletions

View File

@ -18,11 +18,6 @@
*/ */
package org.apache.hadoop.hbase.wal; package org.apache.hadoop.hbase.wal;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import java.io.EOFException; import java.io.EOFException;
import java.io.FileNotFoundException; import java.io.FileNotFoundException;
import java.io.IOException; import java.io.IOException;
@ -30,6 +25,7 @@ import java.io.InterruptedIOException;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -68,6 +64,7 @@ import org.apache.hadoop.hbase.client.Delete;
import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Durability;
import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.io.HeapSize;
import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.log.HBaseMarkers;
import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.master.SplitLogManager;
@ -77,12 +74,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.LastSequenceId; import org.apache.hadoop.hbase.regionserver.LastSequenceId;
import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL;
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.CancelableProgressable;
import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.ClassSize;
@ -99,6 +90,16 @@ import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
/** /**
* This class is responsible for splitting up a bunch of regionserver commit log * This class is responsible for splitting up a bunch of regionserver commit log
* files that are no longer being written to, into new files, one per region, for * files that are no longer being written to, into new files, one per region, for
@ -138,6 +139,12 @@ public class WALSplitter {
// the file being split currently // the file being split currently
private FileStatus fileBeingSplit; private FileStatus fileBeingSplit;
// if we limit the number of writers opened for sinking recovered edits
private final boolean splitWriterCreationBounded;
public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
@VisibleForTesting @VisibleForTesting
WALSplitter(final WALFactory factory, Configuration conf, Path rootDir, WALSplitter(final WALFactory factory, Configuration conf, Path rootDir,
FileSystem fs, LastSequenceId idChecker, FileSystem fs, LastSequenceId idChecker,
@ -154,12 +161,20 @@ public class WALSplitter {
this.walFactory = factory; this.walFactory = factory;
PipelineController controller = new PipelineController(); PipelineController controller = new PipelineController();
this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
entryBuffers = new EntryBuffers(controller, entryBuffers = new EntryBuffers(controller,
this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024)); this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024),
splitWriterCreationBounded);
int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
if(splitWriterCreationBounded){
outputSink = new BoundedLogWriterCreationOutputSink(
controller, entryBuffers, numWriterThreads);
}else {
outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads);
} }
}
/** /**
* Splits a WAL file into region's recovered-edits directory. * Splits a WAL file into region's recovered-edits directory.
@ -840,10 +855,17 @@ public class WALSplitter {
long totalBuffered = 0; long totalBuffered = 0;
long maxHeapUsage; long maxHeapUsage;
boolean splitWriterCreationBounded;
public EntryBuffers(PipelineController controller, long maxHeapUsage) { public EntryBuffers(PipelineController controller, long maxHeapUsage) {
this(controller, maxHeapUsage, false);
}
public EntryBuffers(PipelineController controller, long maxHeapUsage,
boolean splitWriterCreationBounded){
this.controller = controller; this.controller = controller;
this.maxHeapUsage = maxHeapUsage; this.maxHeapUsage = maxHeapUsage;
this.splitWriterCreationBounded = splitWriterCreationBounded;
} }
/** /**
@ -884,6 +906,13 @@ public class WALSplitter {
* @return RegionEntryBuffer a buffer of edits to be written. * @return RegionEntryBuffer a buffer of edits to be written.
*/ */
synchronized RegionEntryBuffer getChunkToWrite() { synchronized RegionEntryBuffer getChunkToWrite() {
// The core part of limiting opening writers is it doesn't return chunk only if the
// heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
// region during splitting. It will flush all the logs in the buffer after splitting
// through a threadpool, which means the number of writers it created is under control.
if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
return null;
}
long biggestSize = 0; long biggestSize = 0;
byte[] biggestBufferKey = null; byte[] biggestBufferKey = null;
@ -1062,11 +1091,10 @@ public class WALSplitter {
protected PipelineController controller; protected PipelineController controller;
protected EntryBuffers entryBuffers; protected EntryBuffers entryBuffers;
protected Map<byte[], SinkWriter> writers = Collections protected ConcurrentHashMap<String, SinkWriter> writers = new ConcurrentHashMap<>();
.synchronizedMap(new TreeMap<byte[], SinkWriter>(Bytes.BYTES_COMPARATOR));; protected ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
new ConcurrentHashMap<>();
protected final Map<byte[], Long> regionMaximumEditLogSeqNum = Collections
.synchronizedMap(new TreeMap<byte[], Long>(Bytes.BYTES_COMPARATOR));
protected final List<WriterThread> writerThreads = Lists.newArrayList(); protected final List<WriterThread> writerThreads = Lists.newArrayList();
@ -1113,11 +1141,10 @@ public class WALSplitter {
*/ */
void updateRegionMaximumEditLogSeqNum(Entry entry) { void updateRegionMaximumEditLogSeqNum(Entry entry) {
synchronized (regionMaximumEditLogSeqNum) { synchronized (regionMaximumEditLogSeqNum) {
Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey() String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
.getEncodedRegionName()); Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) { if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey() regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
.getSequenceId());
} }
} }
} }
@ -1277,31 +1304,82 @@ public class WALSplitter {
* Close all of the output streams. * Close all of the output streams.
* @return the list of paths written. * @return the list of paths written.
*/ */
private List<Path> close() throws IOException { List<Path> close() throws IOException {
Preconditions.checkState(!closeAndCleanCompleted); Preconditions.checkState(!closeAndCleanCompleted);
final List<Path> paths = new ArrayList<>(); final List<Path> paths = new ArrayList<>();
final List<IOException> thrown = Lists.newArrayList(); final List<IOException> thrown = Lists.newArrayList();
ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, ThreadPoolExecutor closeThreadPool = Threads
TimeUnit.SECONDS, new ThreadFactory() { .getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
private int count = 1; private int count = 1;
@Override @Override public Thread newThread(Runnable r) {
public Thread newThread(Runnable r) {
Thread t = new Thread(r, "split-log-closeStream-" + count++); Thread t = new Thread(r, "split-log-closeStream-" + count++);
return t; return t;
} }
}); });
CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool); CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
for (final Map.Entry<byte[], SinkWriter> writersEntry : writers.entrySet()) { boolean progress_failed;
try {
progress_failed = executeCloseTask(completionService, thrown, paths);
} catch (InterruptedException e) {
IOException iie = new InterruptedIOException();
iie.initCause(e);
throw iie;
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
closeThreadPool.shutdownNow();
}
if (!thrown.isEmpty()) {
throw MultipleIOException.createIOException(thrown);
}
writersClosed = true;
closeAndCleanCompleted = true;
if (progress_failed) {
return null;
}
return paths;
}
/**
* @param completionService threadPool to execute the closing tasks
* @param thrown store the exceptions
* @param paths arrayList to store the paths written
* @return if close tasks executed successful
*/
boolean executeCloseTask(CompletionService<Void> completionService,
List<IOException> thrown, List<Path> paths)
throws InterruptedException, ExecutionException {
for (final Map.Entry<String, SinkWriter> writersEntry : writers.entrySet()) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p); LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p);
} }
completionService.submit(new Callable<Void>() { completionService.submit(new Callable<Void>() {
@Override @Override public Void call() throws Exception {
public Void call() throws Exception {
WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); WriterAndPath wap = (WriterAndPath) writersEntry.getValue();
if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p); Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
paths.add(dst);
return null;
}
});
}
boolean progress_failed = false;
for (int i = 0, n = this.writers.size(); i < n; i++) {
Future<Void> future = completionService.take();
future.get();
if (!progress_failed && reporter != null && !reporter.progress()) {
progress_failed = true;
}
}
return progress_failed;
}
Path closeWriter(String encodedRegionName, WriterAndPath wap,
List<IOException> thrown) throws IOException{
if (LOG.isTraceEnabled()) {
LOG.trace("Closing " + wap.p);
}
try { try {
wap.w.close(); wap.w.close();
} catch (IOException ioe) { } catch (IOException ioe) {
@ -1324,7 +1402,7 @@ public class WALSplitter {
} }
Path dst = getCompletedRecoveredEditsFilePath(wap.p, Path dst = getCompletedRecoveredEditsFilePath(wap.p,
regionMaximumEditLogSeqNum.get(writersEntry.getKey())); regionMaximumEditLogSeqNum.get(encodedRegionName));
try { try {
if (!dst.equals(wap.p) && fs.exists(dst)) { if (!dst.equals(wap.p) && fs.exists(dst)) {
deleteOneWithFewerEntries(wap, dst); deleteOneWithFewerEntries(wap, dst);
@ -1343,40 +1421,7 @@ public class WALSplitter {
thrown.add(ioe); thrown.add(ioe);
return null; return null;
} }
paths.add(dst); return dst;
return null;
}
});
}
boolean progress_failed = false;
try {
for (int i = 0, n = this.writers.size(); i < n; i++) {
Future<Void> future = completionService.take();
future.get();
if (!progress_failed && reporter != null && !reporter.progress()) {
progress_failed = true;
}
}
} catch (InterruptedException e) {
IOException iie = new InterruptedIOException();
iie.initCause(e);
throw iie;
} catch (ExecutionException e) {
throw new IOException(e.getCause());
} finally {
closeThreadPool.shutdownNow();
}
if (!thrown.isEmpty()) {
throw MultipleIOException.createIOException(thrown);
}
writersClosed = true;
closeAndCleanCompleted = true;
if (progress_failed) {
return null;
}
return paths;
} }
private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException { private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
@ -1402,7 +1447,6 @@ public class WALSplitter {
} }
} }
} finally { } finally {
synchronized (writers) {
WriterAndPath wap = null; WriterAndPath wap = null;
for (SinkWriter tmpWAP : writers.values()) { for (SinkWriter tmpWAP : writers.values()) {
try { try {
@ -1413,9 +1457,9 @@ public class WALSplitter {
thrown.add(ioe); thrown.add(ioe);
continue; continue;
} }
LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " LOG.info(
+ (wap.nanosSpent / 1000 / 1000) + "ms)"); "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent
} / 1000 / 1000) + "ms)");
} }
writersClosed = true; writersClosed = true;
} }
@ -1428,9 +1472,10 @@ public class WALSplitter {
* long as multiple threads are always acting on different regions. * long as multiple threads are always acting on different regions.
* @return null if this region shouldn't output any logs * @return null if this region shouldn't output any logs
*/ */
private WriterAndPath getWriterAndPath(Entry entry) throws IOException { WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException {
byte region[] = entry.getKey().getEncodedRegionName(); byte region[] = entry.getKey().getEncodedRegionName();
WriterAndPath ret = (WriterAndPath) writers.get(region); String regionName = Bytes.toString(region);
WriterAndPath ret = (WriterAndPath) writers.get(regionName);
if (ret != null) { if (ret != null) {
return ret; return ret;
} }
@ -1444,14 +1489,16 @@ public class WALSplitter {
blacklistedRegions.add(region); blacklistedRegions.add(region);
return null; return null;
} }
writers.put(region, ret); if(reusable) {
writers.put(regionName, ret);
}
return ret; return ret;
} }
/** /**
* @return a path with a write for that path. caller should close. * @return a path with a write for that path. caller should close.
*/ */
private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException { WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException {
Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit.getPath().getName()); Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit.getPath().getName());
if (regionedits == null) { if (regionedits == null) {
return null; return null;
@ -1469,7 +1516,7 @@ public class WALSplitter {
return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId()); return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
} }
private void filterCellByStore(Entry logEntry) { void filterCellByStore(Entry logEntry) {
Map<byte[], Long> maxSeqIdInStores = Map<byte[], Long> maxSeqIdInStores =
regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) { if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
@ -1500,10 +1547,14 @@ public class WALSplitter {
@Override @Override
public void append(RegionEntryBuffer buffer) throws IOException { public void append(RegionEntryBuffer buffer) throws IOException {
appendBuffer(buffer, true);
}
WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{
List<Entry> entries = buffer.entryBuffer; List<Entry> entries = buffer.entryBuffer;
if (entries.isEmpty()) { if (entries.isEmpty()) {
LOG.warn("got an empty buffer, skipping"); LOG.warn("got an empty buffer, skipping");
return; return null;
} }
WriterAndPath wap = null; WriterAndPath wap = null;
@ -1514,14 +1565,14 @@ public class WALSplitter {
for (Entry logEntry : entries) { for (Entry logEntry : entries) {
if (wap == null) { if (wap == null) {
wap = getWriterAndPath(logEntry); wap = getWriterAndPath(logEntry, reusable);
if (wap == null) { if (wap == null) {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
// This log spews the full edit. Can be massive in the log. Enable only debugging // This log spews the full edit. Can be massive in the log. Enable only debugging
// WAL lost edit issues. // WAL lost edit issues.
LOG.trace("getWriterAndPath decided we don't need to write edits for " + logEntry); LOG.trace("getWriterAndPath decided we don't need to write edits for " + logEntry);
} }
return; return null;
} }
} }
filterCellByStore(logEntry); filterCellByStore(logEntry);
@ -1542,6 +1593,7 @@ public class WALSplitter {
LOG.error(HBaseMarkers.FATAL, " Got while writing log entry to log", e); LOG.error(HBaseMarkers.FATAL, " Got while writing log entry to log", e);
throw e; throw e;
} }
return wap;
} }
@Override @Override
@ -1561,10 +1613,8 @@ public class WALSplitter {
@Override @Override
public Map<byte[], Long> getOutputCounts() { public Map<byte[], Long> getOutputCounts() {
TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
synchronized (writers) { for (Map.Entry<String, SinkWriter> entry : writers.entrySet()) {
for (Map.Entry<byte[], SinkWriter> entry : writers.entrySet()) { ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
ret.put(entry.getKey(), entry.getValue().editsWritten);
}
} }
return ret; return ret;
} }
@ -1575,6 +1625,114 @@ public class WALSplitter {
} }
} }
/**
*
*/
class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
public BoundedLogWriterCreationOutputSink(PipelineController controller,
EntryBuffers entryBuffers, int numWriters) {
super(controller, entryBuffers, numWriters);
}
@Override
public List<Path> finishWritingAndClose() throws IOException {
boolean isSuccessful;
List<Path> result;
try {
isSuccessful = finishWriting(false);
} finally {
result = close();
}
if (isSuccessful) {
splits = result;
}
return splits;
}
@Override
boolean executeCloseTask(CompletionService<Void> completionService,
List<IOException> thrown, List<Path> paths)
throws InterruptedException, ExecutionException {
for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) {
LOG.info("Submitting writeThenClose of " + buffer.getValue().encodedRegionName);
completionService.submit(new Callable<Void>() {
public Void call() throws Exception {
Path dst = writeThenClose(buffer.getValue());
paths.add(dst);
return null;
}
});
}
boolean progress_failed = false;
for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
Future<Void> future = completionService.take();
future.get();
if (!progress_failed && reporter != null && !reporter.progress()) {
progress_failed = true;
}
}
return progress_failed;
}
/**
* since the splitting process may create multiple output files, we need a map
* regionRecoverStatMap to track the output count of each region.
* @return a map from encoded region ID to the number of edits written out for that region.
*/
@Override
public Map<byte[], Long> getOutputCounts() {
Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
for(Map.Entry<String, Long> entry: regionRecoverStatMap.entrySet()){
regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
}
return regionRecoverStatMapResult;
}
/**
* @return the number of recovered regions
*/
@Override
public int getNumberOfRecoveredRegions() {
return regionRecoverStatMap.size();
}
/**
* Append the buffer to a new recovered edits file, then close it after all done
* @param buffer contain all entries of a certain region
* @throws IOException when closeWriter failed
*/
@Override
public void append(RegionEntryBuffer buffer) throws IOException {
writeThenClose(buffer);
}
private Path writeThenClose(RegionEntryBuffer buffer) throws IOException {
WriterAndPath wap = appendBuffer(buffer, false);
if(wap != null) {
String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
if (value != null) {
Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
regionRecoverStatMap.put(encodedRegionName, newValue);
}
}
Path dst = null;
List<IOException> thrown = new ArrayList<>();
if(wap != null){
dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
}
if (!thrown.isEmpty()) {
throw MultipleIOException.createIOException(thrown);
}
return dst;
}
}
/** /**
* Class wraps the actual writer which writes data out and related statistics * Class wraps the actual writer which writes data out and related statistics
*/ */

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.regionserver.wal;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.apache.hadoop.hbase.wal.WALSplitter;
import org.junit.BeforeClass;
import org.junit.experimental.categories.Category;
@Category(MediumTests.class)
public class TestWALReplayBoundedLogWriterCreation extends TestWALReplay {
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TestWALReplay.setUpBeforeClass();
TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true);
}
}

View File

@ -0,0 +1,44 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.wal;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.experimental.categories.Category;
@Category(LargeTests.class)
public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit{
@BeforeClass
public static void setUpBeforeClass() throws Exception {
TestWALSplit.setUpBeforeClass();
TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true);
}
/**
* The logic of this test has conflict with the limit writers split logic, skip this test
*/
@Test(timeout=300000)
@Ignore
public void testThreadingSlowWriterSmallBuffer() throws Exception {
super.testThreadingSlowWriterSmallBuffer();
}
}