diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 4af782dfc55..22f7e1a9003 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -18,11 +18,6 @@ */ package org.apache.hadoop.hbase.wal; -import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; -import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; - import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; @@ -31,6 +26,7 @@ import java.text.ParseException; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.NavigableSet; @@ -71,6 +67,7 @@ import org.apache.hadoop.hbase.client.Delete; import org.apache.hadoop.hbase.client.Durability; import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.log.HBaseMarkers; import org.apache.hadoop.hbase.master.SplitLogManager; @@ -80,12 +77,6 @@ import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.LastSequenceId; import org.apache.hadoop.hbase.regionserver.wal.AbstractFSWAL; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; -import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; -import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; -import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; import org.apache.hadoop.hbase.util.ClassSize; @@ -101,6 +92,16 @@ import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; +import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat; +import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds; +import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.StoreSequenceId; +import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos; /** * This class is responsible for splitting up a bunch of regionserver commit log * files that are no longer being written to, into new files, one per region, for @@ -140,6 +141,12 @@ public class WALSplitter { // the file being split currently private FileStatus fileBeingSplit; + // if we limit the number of writers opened for sinking recovered edits + private final boolean splitWriterCreationBounded; + + public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded"; + + @VisibleForTesting WALSplitter(final WALFactory factory, Configuration conf, Path rootDir, FileSystem fs, LastSequenceId idChecker, @@ -156,11 +163,19 @@ public class WALSplitter { this.walFactory = factory; PipelineController controller = new PipelineController(); + this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); + entryBuffers = new EntryBuffers(controller, - this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024)); + this.conf.getInt("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024), + splitWriterCreationBounded); int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); - outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); + if(splitWriterCreationBounded){ + outputSink = new BoundedLogWriterCreationOutputSink( + controller, entryBuffers, numWriterThreads); + }else { + outputSink = new LogRecoveredEditsOutputSink(controller, entryBuffers, numWriterThreads); + } } /** @@ -833,10 +848,17 @@ public class WALSplitter { long totalBuffered = 0; long maxHeapUsage; + boolean splitWriterCreationBounded; public EntryBuffers(PipelineController controller, long maxHeapUsage) { + this(controller, maxHeapUsage, false); + } + + public EntryBuffers(PipelineController controller, long maxHeapUsage, + boolean splitWriterCreationBounded){ this.controller = controller; this.maxHeapUsage = maxHeapUsage; + this.splitWriterCreationBounded = splitWriterCreationBounded; } /** @@ -876,6 +898,13 @@ public class WALSplitter { * @return RegionEntryBuffer a buffer of edits to be written. */ synchronized RegionEntryBuffer getChunkToWrite() { + // The core part of limiting opening writers is it doesn't return chunk only if the + // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each + // region during splitting. It will flush all the logs in the buffer after splitting + // through a threadpool, which means the number of writers it created is under control. + if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) { + return null; + } long biggestSize = 0; byte[] biggestBufferKey = null; @@ -1054,11 +1083,10 @@ public class WALSplitter { protected PipelineController controller; protected EntryBuffers entryBuffers; - protected Map writers = Collections - .synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR));; + protected ConcurrentHashMap writers = new ConcurrentHashMap<>(); + protected ConcurrentHashMap regionMaximumEditLogSeqNum = + new ConcurrentHashMap<>(); - protected final Map regionMaximumEditLogSeqNum = Collections - .synchronizedMap(new TreeMap(Bytes.BYTES_COMPARATOR)); protected final List writerThreads = Lists.newArrayList(); @@ -1105,11 +1133,10 @@ public class WALSplitter { */ void updateRegionMaximumEditLogSeqNum(Entry entry) { synchronized (regionMaximumEditLogSeqNum) { - Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(entry.getKey() - .getEncodedRegionName()); + String regionName = Bytes.toString(entry.getKey().getEncodedRegionName()); + Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName); if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) { - regionMaximumEditLogSeqNum.put(entry.getKey().getEncodedRegionName(), entry.getKey() - .getSequenceId()); + regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId()); } } } @@ -1266,87 +1293,24 @@ public class WALSplitter { * Close all of the output streams. * @return the list of paths written. */ - private List close() throws IOException { + List close() throws IOException { Preconditions.checkState(!closeAndCleanCompleted); final List paths = new ArrayList<>(); final List thrown = Lists.newArrayList(); - ThreadPoolExecutor closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, - TimeUnit.SECONDS, new ThreadFactory() { - private int count = 1; + ThreadPoolExecutor closeThreadPool = Threads + .getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() { + private int count = 1; - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "split-log-closeStream-" + count++); - return t; - } - }); + @Override public Thread newThread(Runnable r) { + Thread t = new Thread(r, "split-log-closeStream-" + count++); + return t; + } + }); CompletionService completionService = new ExecutorCompletionService<>(closeThreadPool); - for (final Map.Entry writersEntry : writers.entrySet()) { - if (LOG.isTraceEnabled()) { - LOG.trace("Submitting close of " + ((WriterAndPath)writersEntry.getValue()).p); - } - completionService.submit(new Callable() { - @Override - public Void call() throws Exception { - WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); - if (LOG.isTraceEnabled()) LOG.trace("Closing " + wap.p); - try { - wap.w.close(); - } catch (IOException ioe) { - LOG.error("Couldn't close log at " + wap.p, ioe); - thrown.add(ioe); - return null; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten - + " edits, skipped " + wap.editsSkipped + " edits in " - + (wap.nanosSpent / 1000 / 1000) + "ms"); - } - if (wap.editsWritten == 0) { - // just remove the empty recovered.edits file - if (fs.exists(wap.p) && !fs.delete(wap.p, false)) { - LOG.warn("Failed deleting empty " + wap.p); - throw new IOException("Failed deleting empty " + wap.p); - } - return null; - } - - Path dst = getCompletedRecoveredEditsFilePath(wap.p, - regionMaximumEditLogSeqNum.get(writersEntry.getKey())); - try { - if (!dst.equals(wap.p) && fs.exists(dst)) { - deleteOneWithFewerEntries(wap, dst); - } - // Skip the unit tests which create a splitter that reads and - // writes the data without touching disk. - // TestHLogSplit#testThreading is an example. - if (fs.exists(wap.p)) { - if (!fs.rename(wap.p, dst)) { - throw new IOException("Failed renaming " + wap.p + " to " + dst); - } - LOG.info("Rename " + wap.p + " to " + dst); - } - } catch (IOException ioe) { - LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); - thrown.add(ioe); - return null; - } - paths.add(dst); - return null; - } - }); - } - - boolean progress_failed = false; + boolean progress_failed; try { - for (int i = 0, n = this.writers.size(); i < n; i++) { - Future future = completionService.take(); - future.get(); - if (!progress_failed && reporter != null && !reporter.progress()) { - progress_failed = true; - } - } + progress_failed = executeCloseTask(completionService, thrown, paths); } catch (InterruptedException e) { IOException iie = new InterruptedIOException(); iie.initCause(e); @@ -1356,7 +1320,6 @@ public class WALSplitter { } finally { closeThreadPool.shutdownNow(); } - if (!thrown.isEmpty()) { throw MultipleIOException.createIOException(thrown); } @@ -1368,6 +1331,88 @@ public class WALSplitter { return paths; } + /** + * @param completionService threadPool to execute the closing tasks + * @param thrown store the exceptions + * @param paths arrayList to store the paths written + * @return if close tasks executed successful + */ + boolean executeCloseTask(CompletionService completionService, + List thrown, List paths) + throws InterruptedException, ExecutionException { + for (final Map.Entry writersEntry : writers.entrySet()) { + if (LOG.isTraceEnabled()) { + LOG.trace("Submitting close of " + ((WriterAndPath) writersEntry.getValue()).p); + } + completionService.submit(new Callable() { + @Override public Void call() throws Exception { + WriterAndPath wap = (WriterAndPath) writersEntry.getValue(); + Path dst = closeWriter(writersEntry.getKey(), wap, thrown); + paths.add(dst); + return null; + } + }); + } + boolean progress_failed = false; + for (int i = 0, n = this.writers.size(); i < n; i++) { + Future future = completionService.take(); + future.get(); + if (!progress_failed && reporter != null && !reporter.progress()) { + progress_failed = true; + } + } + return progress_failed; + } + + Path closeWriter(String encodedRegionName, WriterAndPath wap, + List thrown) throws IOException{ + if (LOG.isTraceEnabled()) { + LOG.trace("Closing " + wap.p); + } + try { + wap.w.close(); + } catch (IOException ioe) { + LOG.error("Couldn't close log at " + wap.p, ioe); + thrown.add(ioe); + return null; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Closed wap " + wap.p + " (wrote " + wap.editsWritten + + " edits, skipped " + wap.editsSkipped + " edits in " + + (wap.nanosSpent / 1000 / 1000) + "ms"); + } + if (wap.editsWritten == 0) { + // just remove the empty recovered.edits file + if (fs.exists(wap.p) && !fs.delete(wap.p, false)) { + LOG.warn("Failed deleting empty " + wap.p); + throw new IOException("Failed deleting empty " + wap.p); + } + return null; + } + + Path dst = getCompletedRecoveredEditsFilePath(wap.p, + regionMaximumEditLogSeqNum.get(encodedRegionName)); + try { + if (!dst.equals(wap.p) && fs.exists(dst)) { + deleteOneWithFewerEntries(wap, dst); + } + // Skip the unit tests which create a splitter that reads and + // writes the data without touching disk. + // TestHLogSplit#testThreading is an example. + if (fs.exists(wap.p)) { + if (!fs.rename(wap.p, dst)) { + throw new IOException("Failed renaming " + wap.p + " to " + dst); + } + LOG.info("Rename " + wap.p + " to " + dst); + } + } catch (IOException ioe) { + LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe); + thrown.add(ioe); + return null; + } + return dst; + } + private List closeLogWriters(List thrown) throws IOException { if (writersClosed) { return thrown; @@ -1390,20 +1435,19 @@ public class WALSplitter { } } } finally { - synchronized (writers) { - WriterAndPath wap = null; - for (SinkWriter tmpWAP : writers.values()) { - try { - wap = (WriterAndPath) tmpWAP; - wap.w.close(); - } catch (IOException ioe) { - LOG.error("Couldn't close log at {}", wap.p, ioe); - thrown.add(ioe); - continue; - } - LOG.info("Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " - + (wap.nanosSpent / 1000 / 1000) + "ms)"); + WriterAndPath wap = null; + for (SinkWriter tmpWAP : writers.values()) { + try { + wap = (WriterAndPath) tmpWAP; + wap.w.close(); + } catch (IOException ioe) { + LOG.error("Couldn't close log at " + wap.p, ioe); + thrown.add(ioe); + continue; } + LOG.info( + "Closed log " + wap.p + " (wrote " + wap.editsWritten + " edits in " + (wap.nanosSpent + / 1000 / 1000) + "ms)"); } writersClosed = true; } @@ -1416,9 +1460,10 @@ public class WALSplitter { * long as multiple threads are always acting on different regions. * @return null if this region shouldn't output any logs */ - private WriterAndPath getWriterAndPath(Entry entry) throws IOException { + WriterAndPath getWriterAndPath(Entry entry, boolean reusable) throws IOException { byte region[] = entry.getKey().getEncodedRegionName(); - WriterAndPath ret = (WriterAndPath) writers.get(region); + String regionName = Bytes.toString(region); + WriterAndPath ret = (WriterAndPath) writers.get(regionName); if (ret != null) { return ret; } @@ -1432,14 +1477,16 @@ public class WALSplitter { blacklistedRegions.add(region); return null; } - writers.put(region, ret); + if(reusable) { + writers.put(regionName, ret); + } return ret; } /** * @return a path with a write for that path. caller should close. */ - private WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException { + WriterAndPath createWAP(byte[] region, Entry entry, Path rootdir) throws IOException { Path regionedits = getRegionSplitEditsPath(fs, entry, rootdir, fileBeingSplit.getPath().getName()); if (regionedits == null) { return null; @@ -1457,7 +1504,7 @@ public class WALSplitter { return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId()); } - private void filterCellByStore(Entry logEntry) { + void filterCellByStore(Entry logEntry) { Map maxSeqIdInStores = regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); if (MapUtils.isEmpty(maxSeqIdInStores)) { @@ -1488,10 +1535,14 @@ public class WALSplitter { @Override public void append(RegionEntryBuffer buffer) throws IOException { + appendBuffer(buffer, true); + } + + WriterAndPath appendBuffer(RegionEntryBuffer buffer, boolean reusable) throws IOException{ List entries = buffer.entryBuffer; if (entries.isEmpty()) { LOG.warn("got an empty buffer, skipping"); - return; + return null; } WriterAndPath wap = null; @@ -1502,14 +1553,14 @@ public class WALSplitter { for (Entry logEntry : entries) { if (wap == null) { - wap = getWriterAndPath(logEntry); + wap = getWriterAndPath(logEntry, reusable); if (wap == null) { if (LOG.isTraceEnabled()) { // This log spews the full edit. Can be massive in the log. Enable only debugging // WAL lost edit issues. LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry); } - return; + return null; } } filterCellByStore(logEntry); @@ -1530,6 +1581,7 @@ public class WALSplitter { LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e); throw e; } + return wap; } @Override @@ -1549,10 +1601,8 @@ public class WALSplitter { @Override public Map getOutputCounts() { TreeMap ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); - synchronized (writers) { - for (Map.Entry entry : writers.entrySet()) { - ret.put(entry.getKey(), entry.getValue().editsWritten); - } + for (Map.Entry entry : writers.entrySet()) { + ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten); } return ret; } @@ -1563,6 +1613,114 @@ public class WALSplitter { } } + /** + * + */ + class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink { + + private ConcurrentHashMap regionRecoverStatMap = new ConcurrentHashMap<>(); + + public BoundedLogWriterCreationOutputSink(PipelineController controller, + EntryBuffers entryBuffers, int numWriters) { + super(controller, entryBuffers, numWriters); + } + + @Override + public List finishWritingAndClose() throws IOException { + boolean isSuccessful; + List result; + try { + isSuccessful = finishWriting(false); + } finally { + result = close(); + } + if (isSuccessful) { + splits = result; + } + return splits; + } + + @Override + boolean executeCloseTask(CompletionService completionService, + List thrown, List paths) + throws InterruptedException, ExecutionException { + for (final Map.Entry buffer : entryBuffers.buffers.entrySet()) { + LOG.info("Submitting writeThenClose of " + buffer.getValue().encodedRegionName); + completionService.submit(new Callable() { + public Void call() throws Exception { + Path dst = writeThenClose(buffer.getValue()); + paths.add(dst); + return null; + } + }); + } + boolean progress_failed = false; + for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { + Future future = completionService.take(); + future.get(); + if (!progress_failed && reporter != null && !reporter.progress()) { + progress_failed = true; + } + } + + return progress_failed; + } + + /** + * since the splitting process may create multiple output files, we need a map + * regionRecoverStatMap to track the output count of each region. + * @return a map from encoded region ID to the number of edits written out for that region. + */ + @Override + public Map getOutputCounts() { + Map regionRecoverStatMapResult = new HashMap<>(); + for(Map.Entry entry: regionRecoverStatMap.entrySet()){ + regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue()); + } + return regionRecoverStatMapResult; + } + + /** + * @return the number of recovered regions + */ + @Override + public int getNumberOfRecoveredRegions() { + return regionRecoverStatMap.size(); + } + + /** + * Append the buffer to a new recovered edits file, then close it after all done + * @param buffer contain all entries of a certain region + * @throws IOException when closeWriter failed + */ + @Override + public void append(RegionEntryBuffer buffer) throws IOException { + writeThenClose(buffer); + } + + private Path writeThenClose(RegionEntryBuffer buffer) throws IOException { + WriterAndPath wap = appendBuffer(buffer, false); + if(wap != null) { + String encodedRegionName = Bytes.toString(buffer.encodedRegionName); + Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten); + if (value != null) { + Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten; + regionRecoverStatMap.put(encodedRegionName, newValue); + } + } + + Path dst = null; + List thrown = new ArrayList<>(); + if(wap != null){ + dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown); + } + if (!thrown.isEmpty()) { + throw MultipleIOException.createIOException(thrown); + } + return dst; + } + } + /** * Class wraps the actual writer which writes data out and related statistics */ diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java new file mode 100644 index 00000000000..55bbeafad32 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/wal/TestWALReplayBoundedLogWriterCreation.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.regionserver.wal; + +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.apache.hadoop.hbase.wal.WALSplitter; +import org.junit.BeforeClass; +import org.junit.experimental.categories.Category; + + +@Category(MediumTests.class) +public class TestWALReplayBoundedLogWriterCreation extends TestWALReplay { + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TestWALReplay.setUpBeforeClass(); + TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true); + } +} + diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java new file mode 100644 index 00000000000..844cb3a27e2 --- /dev/null +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplitBoundedLogWriterCreation.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hbase.wal; + +import org.apache.hadoop.hbase.testclassification.LargeTests; +import org.junit.BeforeClass; +import org.junit.Ignore; +import org.junit.Test; +import org.junit.experimental.categories.Category; + +@Category(LargeTests.class) +public class TestWALSplitBoundedLogWriterCreation extends TestWALSplit{ + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TestWALSplit.setUpBeforeClass(); + TEST_UTIL.getConfiguration().setBoolean(WALSplitter.SPLIT_WRITER_CREATION_BOUNDED, true); + } + + /** + * The logic of this test has conflict with the limit writers split logic, skip this test + */ + @Test(timeout=300000) + @Ignore + public void testThreadingSlowWriterSmallBuffer() throws Exception { + super.testThreadingSlowWriterSmallBuffer(); + } +} +