diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java new file mode 100644 index 00000000000..da952eb5a3f --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/AbstractRecoveredEditsOutputSink.java @@ -0,0 +1,270 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath; +import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath; + +import java.io.EOFException; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.Cell; +import org.apache.hadoop.hbase.CellUtil; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.log.HBaseMarkers; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; + +@InterfaceAudience.Private +abstract class AbstractRecoveredEditsOutputSink extends OutputSink { + private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class); + private final WALSplitter walSplitter; + private final ConcurrentMap regionMaximumEditLogSeqNum = new ConcurrentHashMap<>(); + + public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter, + WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { + super(controller, entryBuffers, numWriters); + this.walSplitter = walSplitter; + } + + /** + * @return a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close. + */ + protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region, + long seqId) throws IOException { + Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId, + walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(), + walSplitter.conf); + if (walSplitter.walFS.exists(regionEditsPath)) { + LOG.warn("Found old edits file. It could be the " + + "result of a previous failed split attempt. Deleting " + regionEditsPath + ", length=" + + walSplitter.walFS.getFileStatus(regionEditsPath).getLen()); + if (!walSplitter.walFS.delete(regionEditsPath, false)) { + LOG.warn("Failed delete of old {}", regionEditsPath); + } + } + WALProvider.Writer w = walSplitter.createWriter(regionEditsPath); + LOG.info("Creating recovered edits writer path={}", regionEditsPath); + return new RecoveredEditsWriter(region, regionEditsPath, w, seqId); + } + + protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter, + List thrown) throws IOException { + try { + editsWriter.writer.close(); + } catch (IOException ioe) { + LOG.error("Could not close recovered edits at {}", editsWriter.path, ioe); + thrown.add(ioe); + return null; + } + LOG.info("Closed recovered edits writer path={} (wrote {} edits, skipped {} edits in {} ms", + editsWriter.path, editsWriter.editsWritten, editsWriter.editsSkipped, + editsWriter.nanosSpent / 1000 / 1000); + if (editsWriter.editsWritten == 0) { + // just remove the empty recovered.edits file + if (walSplitter.walFS.exists(editsWriter.path) && + !walSplitter.walFS.delete(editsWriter.path, false)) { + LOG.warn("Failed deleting empty {}", editsWriter.path); + throw new IOException("Failed deleting empty " + editsWriter.path); + } + return null; + } + + Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path, + regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName))); + try { + if (!dst.equals(editsWriter.path) && walSplitter.walFS.exists(dst)) { + deleteOneWithFewerEntries(editsWriter, dst); + } + // Skip the unit tests which create a splitter that reads and + // writes the data without touching disk. + // TestHLogSplit#testThreading is an example. + if (walSplitter.walFS.exists(editsWriter.path)) { + if (!walSplitter.walFS.rename(editsWriter.path, dst)) { + throw new IOException( + "Failed renaming recovered edits " + editsWriter.path + " to " + dst); + } + LOG.info("Rename recovered edits {} to {}", editsWriter.path, dst); + } + } catch (IOException ioe) { + LOG.error("Could not rename recovered edits {} to {}", editsWriter.path, dst, ioe); + thrown.add(ioe); + return null; + } + return dst; + } + + @Override + public boolean keepRegionEvent(WAL.Entry entry) { + ArrayList cells = entry.getEdit().getCells(); + for (Cell cell : cells) { + if (WALEdit.isCompactionMarker(cell)) { + return true; + } + } + return false; + } + + /** + * Update region's maximum edit log SeqNum. + */ + void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) { + synchronized (regionMaximumEditLogSeqNum) { + String regionName = Bytes.toString(entry.getKey().getEncodedRegionName()); + Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName); + if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) { + regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId()); + } + } + } + + // delete the one with fewer wal entries + private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, Path dst) + throws IOException { + long dstMinLogSeqNum = -1L; + try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) { + WAL.Entry entry = reader.next(); + if (entry != null) { + dstMinLogSeqNum = entry.getKey().getSequenceId(); + } + } catch (EOFException e) { + LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst, + e); + } + if (editsWriter.minLogSeqNum < dstMinLogSeqNum) { + LOG.warn("Found existing old edits file. It could be the result of a previous failed" + + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" + + walSplitter.walFS.getFileStatus(dst).getLen()); + if (!walSplitter.walFS.delete(dst, false)) { + LOG.warn("Failed deleting of old {}", dst); + throw new IOException("Failed deleting of old " + dst); + } + } else { + LOG.warn( + "Found existing old edits file and we have less entries. Deleting " + editsWriter.path + + ", length=" + walSplitter.walFS.getFileStatus(editsWriter.path).getLen()); + if (!walSplitter.walFS.delete(editsWriter.path, false)) { + LOG.warn("Failed deleting of {}", editsWriter.path); + throw new IOException("Failed deleting of " + editsWriter.path); + } + } + } + + /** + * Private data structure that wraps a {@link WALProvider.Writer} and its Path, also collecting + * statistics about the data written to this output. + */ + final class RecoveredEditsWriter { + /* Count of edits written to this path */ + long editsWritten = 0; + /* Count of edits skipped to this path */ + long editsSkipped = 0; + /* Number of nanos spent writing to this log */ + long nanosSpent = 0; + + final byte[] encodedRegionName; + final Path path; + final WALProvider.Writer writer; + final long minLogSeqNum; + + RecoveredEditsWriter(byte[] encodedRegionName, Path path, WALProvider.Writer writer, + long minLogSeqNum) { + this.encodedRegionName = encodedRegionName; + this.path = path; + this.writer = writer; + this.minLogSeqNum = minLogSeqNum; + } + + private void incrementEdits(int edits) { + editsWritten += edits; + } + + private void incrementSkippedEdits(int skipped) { + editsSkipped += skipped; + totalSkippedEdits.addAndGet(skipped); + } + + private void incrementNanoTime(long nanos) { + nanosSpent += nanos; + } + + void writeRegionEntries(List entries) throws IOException { + long startTime = System.nanoTime(); + try { + int editsCount = 0; + for (WAL.Entry logEntry : entries) { + filterCellByStore(logEntry); + if (!logEntry.getEdit().isEmpty()) { + writer.append(logEntry); + updateRegionMaximumEditLogSeqNum(logEntry); + editsCount++; + } else { + incrementSkippedEdits(1); + } + } + // Pass along summary statistics + incrementEdits(editsCount); + incrementNanoTime(System.nanoTime() - startTime); + } catch (IOException e) { + e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; + LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e); + throw e; + } + } + + private void filterCellByStore(WAL.Entry logEntry) { + Map maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores() + .get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); + if (MapUtils.isEmpty(maxSeqIdInStores)) { + return; + } + // Create the array list for the cells that aren't filtered. + // We make the assumption that most cells will be kept. + ArrayList keptCells = new ArrayList<>(logEntry.getEdit().getCells().size()); + for (Cell cell : logEntry.getEdit().getCells()) { + if (WALEdit.isMetaEditFamily(cell)) { + keptCells.add(cell); + } else { + byte[] family = CellUtil.cloneFamily(cell); + Long maxSeqId = maxSeqIdInStores.get(family); + // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, + // or the master was crashed before and we can not get the information. + if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) { + keptCells.add(cell); + } + } + } + + // Anything in the keptCells array list is still live. + // So rather than removing the cells from the array list + // which would be an O(n^2) operation, we just replace the list + logEntry.getEdit().setCells(keptCells); + } + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedEntryBuffers.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedEntryBuffers.java new file mode 100644 index 00000000000..ed3c8b7f3e2 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedEntryBuffers.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Used for {@link BoundedRecoveredEditsOutputSink}. The core part of limiting opening writers is it + * doesn't return chunk only if the heap size is over maxHeapUsage. Thus it doesn't need to create + * a writer for each region during splitting. The returned {@link EntryBuffers.RegionEntryBuffer} + * will be write to recovered edits file and close the writer immediately. + * See {@link BoundedRecoveredEditsOutputSink#append(EntryBuffers.RegionEntryBuffer)} for more + * details. + */ +@InterfaceAudience.Private +public class BoundedEntryBuffers extends EntryBuffers { + + public BoundedEntryBuffers(WALSplitter.PipelineController controller, long maxHeapUsage) { + super(controller, maxHeapUsage); + } + + @Override + synchronized RegionEntryBuffer getChunkToWrite() { + if (totalBuffered < maxHeapUsage) { + return null; + } + return super.getChunkToWrite(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java deleted file mode 100644 index 77b8f936852..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedLogWriterCreationOutputSink.java +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.wal; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Future; - -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.io.MultipleIOException; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Class that manages the output streams from the log splitting process. - * Bounded means the output streams will be no more than the size of threadpool - */ -@InterfaceAudience.Private -public class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink { - private static final Logger LOG = - LoggerFactory.getLogger(BoundedLogWriterCreationOutputSink.class); - - private ConcurrentHashMap regionRecoverStatMap = new ConcurrentHashMap<>(); - - public BoundedLogWriterCreationOutputSink(WALSplitter walSplitter, - WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { - super(walSplitter, controller, entryBuffers, numWriters); - } - - @Override - public List finishWritingAndClose() throws IOException { - boolean isSuccessful; - List result; - try { - isSuccessful = finishWriting(false); - } finally { - result = close(); - } - if (isSuccessful) { - splits = result; - } - return splits; - } - - @Override - boolean executeCloseTask(CompletionService completionService, List thrown, - List paths) throws InterruptedException, ExecutionException { - for (final Map.Entry buffer : entryBuffers.buffers - .entrySet()) { - LOG.info("Submitting writeThenClose of {}", - Bytes.toString(buffer.getValue().encodedRegionName)); - completionService.submit(new Callable() { - @Override - public Void call() throws Exception { - Path dst = writeThenClose(buffer.getValue()); - paths.add(dst); - return null; - } - }); - } - boolean progress_failed = false; - for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { - Future future = completionService.take(); - future.get(); - if (!progress_failed && reporter != null && !reporter.progress()) { - progress_failed = true; - } - } - - return progress_failed; - } - - /** - * since the splitting process may create multiple output files, we need a map - * regionRecoverStatMap to track the output count of each region. - * @return a map from encoded region ID to the number of edits written out for that region. - */ - @Override - public Map getOutputCounts() { - Map regionRecoverStatMapResult = new HashMap<>(); - for (Map.Entry entry : regionRecoverStatMap.entrySet()) { - regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue()); - } - return regionRecoverStatMapResult; - } - - /** - * @return the number of recovered regions - */ - @Override - public int getNumberOfRecoveredRegions() { - return regionRecoverStatMap.size(); - } - - /** - * Append the buffer to a new recovered edits file, then close it after all done - * @param buffer contain all entries of a certain region - * @throws IOException when closeWriter failed - */ - @Override - public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException { - writeThenClose(buffer); - } - - private Path writeThenClose(WALSplitter.RegionEntryBuffer buffer) throws IOException { - WALSplitter.WriterAndPath wap = appendBuffer(buffer, false); - if (wap != null) { - String encodedRegionName = Bytes.toString(buffer.encodedRegionName); - Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten); - if (value != null) { - Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten; - regionRecoverStatMap.put(encodedRegionName, newValue); - } - } - - Path dst = null; - List thrown = new ArrayList<>(); - if (wap != null) { - dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown); - } - if (!thrown.isEmpty()) { - throw MultipleIOException.createIOException(thrown); - } - return dst; - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java new file mode 100644 index 00000000000..795192b3fb7 --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/BoundedRecoveredEditsOutputSink.java @@ -0,0 +1,141 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Class that manages the output streams from the log splitting process. + * Every region may have many recovered edits file. But the opening writers is bounded. + * Bounded means the output streams will be no more than the size of threadpool. + */ +@InterfaceAudience.Private +class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { + private static final Logger LOG = + LoggerFactory.getLogger(BoundedRecoveredEditsOutputSink.class); + + // Since the splitting process may create multiple output files, we need a map + // to track the output count of each region. + private ConcurrentHashMap regionEditsWrittenMap = new ConcurrentHashMap<>(); + // Need a counter to track the opening writers. + private final AtomicInteger openingWritersNum = new AtomicInteger(0); + + public BoundedRecoveredEditsOutputSink(WALSplitter walSplitter, + WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { + super(walSplitter, controller, entryBuffers, numWriters); + } + + @Override + public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException { + List entries = buffer.entryBuffer; + if (entries.isEmpty()) { + LOG.warn("got an empty buffer, skipping"); + return; + } + // The key point is create a new writer, write edits then close writer. + RecoveredEditsWriter writer = + createRecoveredEditsWriter(buffer.tableName, buffer.encodedRegionName, + entries.get(0).getKey().getSequenceId()); + if (writer != null) { + openingWritersNum.incrementAndGet(); + writer.writeRegionEntries(entries); + regionEditsWrittenMap.compute(buffer.encodedRegionName, + (k, v) -> v == null ? writer.editsWritten : v + writer.editsWritten); + List thrown = new ArrayList<>(); + Path dst = closeRecoveredEditsWriter(writer, thrown); + splits.add(dst); + openingWritersNum.decrementAndGet(); + if (!thrown.isEmpty()) { + throw MultipleIOException.createIOException(thrown); + } + } + } + + @Override + public List close() throws IOException { + boolean isSuccessful = true; + try { + isSuccessful &= finishWriterThreads(); + } finally { + isSuccessful &= writeRemainingEntryBuffers(); + } + return isSuccessful ? splits : null; + } + + /** + * Write out the remaining RegionEntryBuffers and close the writers. + * + * @return true when there is no error. + */ + private boolean writeRemainingEntryBuffers() throws IOException { + for (EntryBuffers.RegionEntryBuffer buffer : entryBuffers.buffers.values()) { + closeCompletionService.submit(() -> { + append(buffer); + return null; + }); + } + boolean progressFailed = false; + try { + for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) { + Future future = closeCompletionService.take(); + future.get(); + if (!progressFailed && reporter != null && !reporter.progress()) { + progressFailed = true; + } + } + } catch (InterruptedException e) { + IOException iie = new InterruptedIOException(); + iie.initCause(e); + throw iie; + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + closeThreadPool.shutdownNow(); + } + return !progressFailed; + } + + @Override + public Map getOutputCounts() { + return regionEditsWrittenMap; + } + + @Override + public int getNumberOfRecoveredRegions() { + return regionEditsWrittenMap.size(); + } + + @Override + int getNumOpenWriters() { + return openingWritersNum.get(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java index f0974be11d8..6348e5cc882 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/EntryBuffers.java @@ -18,58 +18,56 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; import java.util.TreeSet; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; -import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; import org.apache.yetus.audience.InterfaceAudience; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting; + /** * Class which accumulates edits and separates them into a buffer per region while simultaneously * accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then * pull region-specific buffers from this class. */ @InterfaceAudience.Private -public class EntryBuffers { +class EntryBuffers { private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class); - PipelineController controller; + private final PipelineController controller; - Map buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR); + final Map buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR); /* * Track which regions are currently in the middle of writing. We don't allow an IO thread to pick * up bytes from a region if we're already writing data for that region in a different IO thread. */ - Set currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR); + private final Set currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR); - long totalBuffered = 0; - long maxHeapUsage; - boolean splitWriterCreationBounded; + protected long totalBuffered = 0; + protected final long maxHeapUsage; public EntryBuffers(PipelineController controller, long maxHeapUsage) { - this(controller, maxHeapUsage, false); - } - - public EntryBuffers(PipelineController controller, long maxHeapUsage, - boolean splitWriterCreationBounded) { this.controller = controller; this.maxHeapUsage = maxHeapUsage; - this.splitWriterCreationBounded = splitWriterCreationBounded; } /** * Append a log entry into the corresponding region buffer. Blocks if the total heap usage has * crossed the specified threshold. */ - public void appendEntry(WAL.Entry entry) throws InterruptedException, IOException { + void appendEntry(WAL.Entry entry) throws InterruptedException, IOException { WALKey key = entry.getKey(); RegionEntryBuffer buffer; long incrHeap; @@ -98,13 +96,6 @@ public class EntryBuffers { * @return RegionEntryBuffer a buffer of edits to be written. */ synchronized RegionEntryBuffer getChunkToWrite() { - // The core part of limiting opening writers is it doesn't return chunk only if the - // heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each - // region during splitting. It will flush all the logs in the buffer after splitting - // through a threadpool, which means the number of writers it created is under control. - if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) { - return null; - } long biggestSize = 0; byte[] biggestBufferKey = null; @@ -138,21 +129,56 @@ public class EntryBuffers { } } + @VisibleForTesting synchronized boolean isRegionCurrentlyWriting(byte[] region) { return currentlyWriting.contains(region); } - public void waitUntilDrained() { - synchronized (controller.dataAvailable) { - while (totalBuffered > 0) { - try { - controller.dataAvailable.wait(2000); - } catch (InterruptedException e) { - LOG.warn("Got interrupted while waiting for EntryBuffers is drained"); - Thread.interrupted(); - break; - } - } + /** + * A buffer of some number of edits for a given region. + * This accumulates edits and also provides a memory optimization in order to + * share a single byte array instance for the table and region name. + * Also tracks memory usage of the accumulated edits. + */ + static class RegionEntryBuffer implements HeapSize { + private long heapInBuffer = 0; + final List entryBuffer; + final TableName tableName; + final byte[] encodedRegionName; + + RegionEntryBuffer(TableName tableName, byte[] region) { + this.tableName = tableName; + this.encodedRegionName = region; + this.entryBuffer = new ArrayList<>(); + } + + long appendEntry(WAL.Entry entry) { + internify(entry); + entryBuffer.add(entry); + // TODO linkedlist entry + long incrHeap = entry.getEdit().heapSize() + + ClassSize.align(2 * ClassSize.REFERENCE); // WALKey pointers + heapInBuffer += incrHeap; + return incrHeap; + } + + private void internify(WAL.Entry entry) { + WALKeyImpl k = entry.getKey(); + k.internTableName(this.tableName); + k.internEncodedRegionName(this.encodedRegionName); + } + + @Override + public long heapSize() { + return heapInBuffer; + } + + public byte[] getEncodedRegionName() { + return encodedRegionName; + } + + public TableName getTableName() { + return tableName; } } -} \ No newline at end of file +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java deleted file mode 100644 index 9fc43b13e0d..00000000000 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/LogRecoveredEditsOutputSink.java +++ /dev/null @@ -1,460 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.wal; - -import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath; -import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath; - -import java.io.EOFException; -import java.io.IOException; -import java.io.InterruptedIOException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.TreeMap; -import java.util.concurrent.Callable; -import java.util.concurrent.CompletionService; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorCompletionService; -import java.util.concurrent.Future; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.Cell; -import org.apache.hadoop.hbase.CellUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.log.HBaseMarkers; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.FSUtils; -import org.apache.hadoop.hbase.util.Threads; -import org.apache.hadoop.io.MultipleIOException; -import org.apache.hadoop.ipc.RemoteException; -import org.apache.yetus.audience.InterfaceAudience; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import org.apache.hbase.thirdparty.com.google.common.base.Preconditions; -import org.apache.hbase.thirdparty.com.google.common.collect.Lists; -import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils; -import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils; - -/** - * Class that manages the output streams from the log splitting process. - */ -@InterfaceAudience.Private -public class LogRecoveredEditsOutputSink extends OutputSink { - private static final Logger LOG = LoggerFactory.getLogger(LogRecoveredEditsOutputSink.class); - private WALSplitter walSplitter; - private FileSystem walFS; - private Configuration conf; - - public LogRecoveredEditsOutputSink(WALSplitter walSplitter, - WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { - // More threads could potentially write faster at the expense - // of causing more disk seeks as the logs are split. - // 3. After a certain setting (probably around 3) the - // process will be bound on the reader in the current - // implementation anyway. - super(controller, entryBuffers, numWriters); - this.walSplitter = walSplitter; - this.walFS = walSplitter.walFS; - this.conf = walSplitter.conf; - } - - /** - * @return null if failed to report progress - */ - @Override - public List finishWritingAndClose() throws IOException { - boolean isSuccessful = false; - List result = null; - try { - isSuccessful = finishWriting(false); - } finally { - result = close(); - List thrown = closeLogWriters(null); - if (CollectionUtils.isNotEmpty(thrown)) { - throw MultipleIOException.createIOException(thrown); - } - } - if (isSuccessful) { - splits = result; - } - return splits; - } - - // delete the one with fewer wal entries - private void deleteOneWithFewerEntries(WALSplitter.WriterAndPath wap, Path dst) - throws IOException { - long dstMinLogSeqNum = -1L; - try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) { - WAL.Entry entry = reader.next(); - if (entry != null) { - dstMinLogSeqNum = entry.getKey().getSequenceId(); - } - } catch (EOFException e) { - LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst, - e); - } - if (wap.minLogSeqNum < dstMinLogSeqNum) { - LOG.warn("Found existing old edits file. It could be the result of a previous failed" - + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" - + walFS.getFileStatus(dst).getLen()); - if (!walFS.delete(dst, false)) { - LOG.warn("Failed deleting of old {}", dst); - throw new IOException("Failed deleting of old " + dst); - } - } else { - LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.path - + ", length=" + walFS.getFileStatus(wap.path).getLen()); - if (!walFS.delete(wap.path, false)) { - LOG.warn("Failed deleting of {}", wap.path); - throw new IOException("Failed deleting of " + wap.path); - } - } - } - - /** - * Close all of the output streams. - * @return the list of paths written. - */ - List close() throws IOException { - Preconditions.checkState(!closeAndCleanCompleted); - - final List paths = new ArrayList<>(); - final List thrown = Lists.newArrayList(); - ThreadPoolExecutor closeThreadPool = - Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() { - private int count = 1; - - @Override - public Thread newThread(Runnable r) { - Thread t = new Thread(r, "split-log-closeStream-" + count++); - return t; - } - }); - CompletionService completionService = new ExecutorCompletionService<>(closeThreadPool); - boolean progress_failed; - try { - progress_failed = executeCloseTask(completionService, thrown, paths); - } catch (InterruptedException e) { - IOException iie = new InterruptedIOException(); - iie.initCause(e); - throw iie; - } catch (ExecutionException e) { - throw new IOException(e.getCause()); - } finally { - closeThreadPool.shutdownNow(); - } - if (!thrown.isEmpty()) { - throw MultipleIOException.createIOException(thrown); - } - writersClosed = true; - closeAndCleanCompleted = true; - if (progress_failed) { - return null; - } - return paths; - } - - /** - * @param completionService threadPool to execute the closing tasks - * @param thrown store the exceptions - * @param paths arrayList to store the paths written - * @return if close tasks executed successful - */ - boolean executeCloseTask(CompletionService completionService, List thrown, - List paths) throws InterruptedException, ExecutionException { - for (final Map.Entry writersEntry : writers.entrySet()) { - if (LOG.isTraceEnabled()) { - LOG.trace( - "Submitting close of " + ((WALSplitter.WriterAndPath) writersEntry.getValue()).path); - } - completionService.submit(new Callable() { - @Override - public Void call() throws Exception { - WALSplitter.WriterAndPath wap = (WALSplitter.WriterAndPath) writersEntry.getValue(); - Path dst = closeWriter(writersEntry.getKey(), wap, thrown); - paths.add(dst); - return null; - } - }); - } - boolean progress_failed = false; - for (int i = 0, n = this.writers.size(); i < n; i++) { - Future future = completionService.take(); - future.get(); - if (!progress_failed && reporter != null && !reporter.progress()) { - progress_failed = true; - } - } - return progress_failed; - } - - Path closeWriter(String encodedRegionName, WALSplitter.WriterAndPath wap, - List thrown) throws IOException { - LOG.trace("Closing {}", wap.path); - try { - wap.writer.close(); - } catch (IOException ioe) { - LOG.error("Could not close log at {}", wap.path, ioe); - thrown.add(ioe); - return null; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Closed wap " + wap.path + " (wrote " + wap.editsWritten + " edits, skipped " - + wap.editsSkipped + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms"); - } - if (wap.editsWritten == 0) { - // just remove the empty recovered.edits file - if (walFS.exists(wap.path) && !walFS.delete(wap.path, false)) { - LOG.warn("Failed deleting empty {}", wap.path); - throw new IOException("Failed deleting empty " + wap.path); - } - return null; - } - - Path dst = getCompletedRecoveredEditsFilePath(wap.path, - regionMaximumEditLogSeqNum.get(encodedRegionName)); - try { - if (!dst.equals(wap.path) && walFS.exists(dst)) { - deleteOneWithFewerEntries(wap, dst); - } - // Skip the unit tests which create a splitter that reads and - // writes the data without touching disk. - // TestHLogSplit#testThreading is an example. - if (walFS.exists(wap.path)) { - if (!walFS.rename(wap.path, dst)) { - throw new IOException("Failed renaming " + wap.path + " to " + dst); - } - LOG.info("Rename {} to {}", wap.path, dst); - } - } catch (IOException ioe) { - LOG.error("Could not rename {} to {}", wap.path, dst, ioe); - thrown.add(ioe); - return null; - } - return dst; - } - - private List closeLogWriters(List thrown) throws IOException { - if (writersClosed) { - return thrown; - } - if (thrown == null) { - thrown = Lists.newArrayList(); - } - try { - for (WriterThread writerThread : writerThreads) { - while (writerThread.isAlive()) { - writerThread.setShouldStop(true); - writerThread.interrupt(); - try { - writerThread.join(10); - } catch (InterruptedException e) { - IOException iie = new InterruptedIOException(); - iie.initCause(e); - throw iie; - } - } - } - } finally { - WALSplitter.WriterAndPath wap = null; - for (WALSplitter.SinkWriter tmpWAP : writers.values()) { - try { - wap = (WALSplitter.WriterAndPath) tmpWAP; - wap.writer.close(); - } catch (IOException ioe) { - LOG.error("Couldn't close log at {}", wap.path, ioe); - thrown.add(ioe); - continue; - } - LOG.info("Closed log " + wap.path + " (wrote " + wap.editsWritten + " edits in " - + (wap.nanosSpent / 1000 / 1000) + "ms)"); - } - writersClosed = true; - } - - return thrown; - } - - /** - * Get a writer and path for a log starting at the given entry. This function is threadsafe so - * long as multiple threads are always acting on different regions. - * @return null if this region shouldn't output any logs - */ - WALSplitter.WriterAndPath getWriterAndPath(WAL.Entry entry, boolean reusable) throws IOException { - byte[] region = entry.getKey().getEncodedRegionName(); - String regionName = Bytes.toString(region); - WALSplitter.WriterAndPath ret = (WALSplitter.WriterAndPath) writers.get(regionName); - if (ret != null) { - return ret; - } - // If we already decided that this region doesn't get any output - // we don't need to check again. - if (blacklistedRegions.contains(region)) { - return null; - } - ret = createWAP(region, entry); - if (ret == null) { - blacklistedRegions.add(region); - return null; - } - if (reusable) { - writers.put(regionName, ret); - } - return ret; - } - - /** - * @return a path with a write for that path. caller should close. - */ - WALSplitter.WriterAndPath createWAP(byte[] region, WAL.Entry entry) throws IOException { - String tmpDirName = walSplitter.conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, - HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); - Path regionedits = getRegionSplitEditsPath(entry, - walSplitter.getFileBeingSplit().getPath().getName(), tmpDirName, conf); - if (regionedits == null) { - return null; - } - FileSystem walFs = FSUtils.getWALFileSystem(conf); - if (walFs.exists(regionedits)) { - LOG.warn("Found old edits file. It could be the " - + "result of a previous failed split attempt. Deleting " + regionedits + ", length=" - + walFs.getFileStatus(regionedits).getLen()); - if (!walFs.delete(regionedits, false)) { - LOG.warn("Failed delete of old {}", regionedits); - } - } - WALProvider.Writer w = walSplitter.createWriter(regionedits); - LOG.debug("Creating writer path={}", regionedits); - return new WALSplitter.WriterAndPath(regionedits, w, entry.getKey().getSequenceId()); - } - - - - void filterCellByStore(WAL.Entry logEntry) { - Map maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores() - .get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); - if (MapUtils.isEmpty(maxSeqIdInStores)) { - return; - } - // Create the array list for the cells that aren't filtered. - // We make the assumption that most cells will be kept. - ArrayList keptCells = new ArrayList<>(logEntry.getEdit().getCells().size()); - for (Cell cell : logEntry.getEdit().getCells()) { - if (WALEdit.isMetaEditFamily(cell)) { - keptCells.add(cell); - } else { - byte[] family = CellUtil.cloneFamily(cell); - Long maxSeqId = maxSeqIdInStores.get(family); - // Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade, - // or the master was crashed before and we can not get the information. - if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) { - keptCells.add(cell); - } - } - } - - // Anything in the keptCells array list is still live. - // So rather than removing the cells from the array list - // which would be an O(n^2) operation, we just replace the list - logEntry.getEdit().setCells(keptCells); - } - - @Override - public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException { - appendBuffer(buffer, true); - } - - WALSplitter.WriterAndPath appendBuffer(WALSplitter.RegionEntryBuffer buffer, boolean reusable) - throws IOException { - List entries = buffer.entryBuffer; - if (entries.isEmpty()) { - LOG.warn("got an empty buffer, skipping"); - return null; - } - - WALSplitter.WriterAndPath wap = null; - - long startTime = System.nanoTime(); - try { - int editsCount = 0; - - for (WAL.Entry logEntry : entries) { - if (wap == null) { - wap = getWriterAndPath(logEntry, reusable); - if (wap == null) { - // This log spews the full edit. Can be massive in the log. Enable only debugging - // WAL lost edit issues. - LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry); - return null; - } - } - filterCellByStore(logEntry); - if (!logEntry.getEdit().isEmpty()) { - wap.writer.append(logEntry); - this.updateRegionMaximumEditLogSeqNum(logEntry); - editsCount++; - } else { - wap.incrementSkippedEdits(1); - } - } - // Pass along summary statistics - wap.incrementEdits(editsCount); - wap.incrementNanoTime(System.nanoTime() - startTime); - } catch (IOException e) { - e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; - LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e); - throw e; - } - return wap; - } - - @Override - public boolean keepRegionEvent(WAL.Entry entry) { - ArrayList cells = entry.getEdit().getCells(); - for (Cell cell : cells) { - if (WALEdit.isCompactionMarker(cell)) { - return true; - } - } - return false; - } - - /** - * @return a map from encoded region ID to the number of edits written out for that region. - */ - @Override - public Map getOutputCounts() { - TreeMap ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); - for (Map.Entry entry : writers.entrySet()) { - ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten); - } - return ret; - } - - @Override - public int getNumberOfRecoveredRegions() { - return writers.size(); - } -} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java index 729ea8b3e62..4472f625335 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/OutputSink.java @@ -19,17 +19,18 @@ package org.apache.hadoop.hbase.wal; import java.io.IOException; import java.io.InterruptedIOException; -import java.util.Collections; +import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Set; -import java.util.TreeSet; -import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CompletionService; +import java.util.concurrent.ExecutorCompletionService; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; +import org.apache.hadoop.hbase.util.Threads; import org.apache.yetus.audience.InterfaceAudience; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,39 +42,36 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists; * ways of consuming recovered edits. */ @InterfaceAudience.Private -public abstract class OutputSink { +abstract class OutputSink { private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class); - protected WALSplitter.PipelineController controller; - protected EntryBuffers entryBuffers; + private final WALSplitter.PipelineController controller; + protected final EntryBuffers entryBuffers; - protected ConcurrentHashMap writers = new ConcurrentHashMap<>(); - protected final ConcurrentHashMap regionMaximumEditLogSeqNum = - new ConcurrentHashMap<>(); - - protected final List writerThreads = Lists.newArrayList(); - - /* Set of regions which we've decided should not output edits */ - protected final Set blacklistedRegions = - Collections.synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR)); - - protected boolean closeAndCleanCompleted = false; - - protected boolean writersClosed = false; + private final List writerThreads = Lists.newArrayList(); protected final int numThreads; protected CancelableProgressable reporter = null; - protected AtomicLong skippedEdits = new AtomicLong(); + protected final AtomicLong totalSkippedEdits = new AtomicLong(); - protected List splits = null; + protected final List splits = new ArrayList<>(); + + /** + * Used when close this output sink. + */ + protected final ThreadPoolExecutor closeThreadPool; + protected final CompletionService closeCompletionService; public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { - numThreads = numWriters; + this.numThreads = numWriters; this.controller = controller; this.entryBuffers = entryBuffers; + this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, + Threads.newDaemonThreadFactory("split-log-closeStream-")); + this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool); } void setReporter(CancelableProgressable reporter) { @@ -83,7 +81,7 @@ public abstract class OutputSink { /** * Start the threads that will pump data from the entryBuffers to the output files. */ - public synchronized void startWriterThreads() { + synchronized void startWriterThreads() { for (int i = 0; i < numThreads; i++) { WriterThread t = new WriterThread(controller, entryBuffers, this, i); t.start(); @@ -91,49 +89,21 @@ public abstract class OutputSink { } } - /** - * Update region's maximum edit log SeqNum. - */ - void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) { - synchronized (regionMaximumEditLogSeqNum) { - String regionName = Bytes.toString(entry.getKey().getEncodedRegionName()); - Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName); - if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) { - regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId()); - } - } - } - - /** - * @return the number of currently opened writers - */ - int getNumOpenWriters() { - return this.writers.size(); - } - - long getSkippedEdits() { - return this.skippedEdits.get(); - } - /** * Wait for writer threads to dump all info to the sink + * * @return true when there is no error */ - protected boolean finishWriting(boolean interrupt) throws IOException { + boolean finishWriterThreads() throws IOException { LOG.debug("Waiting for split writer threads to finish"); - boolean progress_failed = false; + boolean progressFailed = false; for (WriterThread t : writerThreads) { t.finish(); } - if (interrupt) { - for (WriterThread t : writerThreads) { - t.interrupt(); // interrupt the writer threads. We are stopping now. - } - } for (WriterThread t : writerThreads) { - if (!progress_failed && reporter != null && !reporter.progress()) { - progress_failed = true; + if (!progressFailed && reporter != null && !reporter.progress()) { + progressFailed = true; } try { t.join(); @@ -144,41 +114,42 @@ public abstract class OutputSink { } } controller.checkForErrors(); - LOG.info("{} split writers finished; closing.", this.writerThreads.size()); - return (!progress_failed); + LOG.info("{} split writer threads finished", this.writerThreads.size()); + return (!progressFailed); } - public abstract List finishWritingAndClose() throws IOException; + long getTotalSkippedEdits() { + return this.totalSkippedEdits.get(); + } + + /** + * @return the number of currently opened writers + */ + abstract int getNumOpenWriters(); + + /** + * @param buffer A buffer of some number of edits for a given region. + */ + abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException; + + abstract List close() throws IOException; /** * @return a map from encoded region ID to the number of edits written out for that region. */ - public abstract Map getOutputCounts(); + abstract Map getOutputCounts(); /** * @return number of regions we've recovered */ - public abstract int getNumberOfRecoveredRegions(); - - /** - * @param buffer A WAL Edit Entry - */ - public abstract void append(WALSplitter.RegionEntryBuffer buffer) throws IOException; - - /** - * WriterThread call this function to help flush internal remaining edits in buffer before close - * @return true when underlying sink has something to flush - */ - public boolean flush() throws IOException { - return false; - } + abstract int getNumberOfRecoveredRegions(); /** * Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will * want to get all of those edits. * @return Return true if this sink wants to accept this region-level WALEdit. */ - public abstract boolean keepRegionEvent(WAL.Entry entry); + abstract boolean keepRegionEvent(WAL.Entry entry); public static class WriterThread extends Thread { private volatile boolean shouldStop = false; @@ -207,11 +178,11 @@ public abstract class OutputSink { private void doRun() throws IOException { LOG.trace("Writer thread starting"); while (true) { - WALSplitter.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); + EntryBuffers.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); if (buffer == null) { // No data currently available, wait on some more to show up synchronized (controller.dataAvailable) { - if (shouldStop && !this.outputSink.flush()) { + if (shouldStop) { return; } try { @@ -234,15 +205,11 @@ public abstract class OutputSink { } } - private void writeBuffer(WALSplitter.RegionEntryBuffer buffer) throws IOException { + private void writeBuffer(EntryBuffers.RegionEntryBuffer buffer) throws IOException { outputSink.append(buffer); } - void setShouldStop(boolean shouldStop) { - this.shouldStop = shouldStop; - } - - void finish() { + private void finish() { synchronized (controller.dataAvailable) { shouldStop = true; controller.dataAvailable.notifyAll(); diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java new file mode 100644 index 00000000000..ffe805faade --- /dev/null +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/RecoveredEditsOutputSink.java @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.wal; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.io.MultipleIOException; +import org.apache.yetus.audience.InterfaceAudience; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hbase.thirdparty.com.google.common.collect.Lists; + +/** + * Class that manages the output streams from the log splitting process. + * Every region only has one recovered edits. + */ +@InterfaceAudience.Private +class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink { + private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class); + private ConcurrentMap writers = new ConcurrentHashMap<>(); + + public RecoveredEditsOutputSink(WALSplitter walSplitter, + WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) { + super(walSplitter, controller, entryBuffers, numWriters); + } + + @Override + public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException { + List entries = buffer.entryBuffer; + if (entries.isEmpty()) { + LOG.warn("got an empty buffer, skipping"); + return; + } + RecoveredEditsWriter writer = + getRecoveredEditsWriter(buffer.tableName, buffer.encodedRegionName, + entries.get(0).getKey().getSequenceId()); + if (writer != null) { + writer.writeRegionEntries(entries); + } + } + + /** + * Get a writer and path for a log starting at the given entry. This function is threadsafe so + * long as multiple threads are always acting on different regions. + * @return null if this region shouldn't output any logs + */ + private RecoveredEditsWriter getRecoveredEditsWriter(TableName tableName, byte[] region, + long seqId) throws IOException { + RecoveredEditsWriter ret = writers.get(Bytes.toString(region)); + if (ret != null) { + return ret; + } + ret = createRecoveredEditsWriter(tableName, region, seqId); + if (ret == null) { + return null; + } + writers.put(Bytes.toString(region), ret); + return ret; + } + + @Override + public List close() throws IOException { + boolean isSuccessful = true; + try { + isSuccessful &= finishWriterThreads(); + } finally { + isSuccessful &= closeWriters(); + } + return isSuccessful ? splits : null; + } + + /** + * Close all of the output streams. + * + * @return true when there is no error. + */ + private boolean closeWriters() throws IOException { + List thrown = Lists.newArrayList(); + for (RecoveredEditsWriter writer : writers.values()) { + closeCompletionService.submit(() -> { + Path dst = closeRecoveredEditsWriter(writer, thrown); + splits.add(dst); + return null; + }); + } + boolean progressFailed = false; + try { + for (int i = 0, n = this.writers.size(); i < n; i++) { + Future future = closeCompletionService.take(); + future.get(); + if (!progressFailed && reporter != null && !reporter.progress()) { + progressFailed = true; + } + } + } catch (InterruptedException e) { + IOException iie = new InterruptedIOException(); + iie.initCause(e); + throw iie; + } catch (ExecutionException e) { + throw new IOException(e.getCause()); + } finally { + closeThreadPool.shutdownNow(); + } + if (!thrown.isEmpty()) { + throw MultipleIOException.createIOException(thrown); + } + return !progressFailed; + } + + @Override + public Map getOutputCounts() { + TreeMap ret = new TreeMap<>(Bytes.BYTES_COMPARATOR); + for (Map.Entry entry : writers.entrySet()) { + ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten); + } + return ret; + } + + @Override + public int getNumberOfRecoveredRegions() { + return writers.size(); + } + + @Override + int getNumOpenWriters() { + return writers.size(); + } +} diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java index 3aba3095c23..ca0d8eff89e 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitUtil.java @@ -163,7 +163,9 @@ public final class WALSplitUtil { * named for the sequenceid in the passed logEntry: e.g. * /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of * RECOVERED_EDITS_DIR under the region creating it if necessary. - * @param walEntry walEntry to recover + * @param tableName the table name + * @param encodedRegionName the encoded region name + * @param sedId the sequence id which used to generate file name * @param fileNameBeingSplit the file being split currently. Used to generate tmp file name. * @param tmpDirName of the directory used to sideline old recovered edits file * @param conf configuration @@ -172,12 +174,12 @@ public final class WALSplitUtil { */ @SuppressWarnings("deprecation") @VisibleForTesting - static Path getRegionSplitEditsPath(final WAL.Entry walEntry, String fileNameBeingSplit, - String tmpDirName, Configuration conf) throws IOException { + static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long sedId, + String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException { FileSystem walFS = FSUtils.getWALFileSystem(conf); - Path tableDir = FSUtils.getWALTableDir(conf, walEntry.getKey().getTableName()); - String encodedRegionName = Bytes.toString(walEntry.getKey().getEncodedRegionName()); - Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName); + Path tableDir = FSUtils.getWALTableDir(conf, tableName); + String encodedRegionNameStr = Bytes.toString(encodedRegionName); + Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionNameStr); Path dir = getRegionDirRecoveredEditsDir(regionDir); if (walFS.exists(dir) && walFS.isFile(dir)) { @@ -185,7 +187,7 @@ public final class WALSplitUtil { if (!walFS.exists(tmp)) { walFS.mkdirs(tmp); } - tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName); + tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionNameStr); LOG.warn("Found existing old file: {}. It could be some " + "leftover of an old installation. It should be a folder instead. " + "So moving it to {}", @@ -201,7 +203,7 @@ public final class WALSplitUtil { // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure // region's replayRecoveredEdits will not delete it - String fileName = formatRecoveredEditsFileName(walEntry.getKey().getSequenceId()); + String fileName = formatRecoveredEditsFileName(sedId); fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit); return new Path(dir, fileName); } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java index 5d07061ed8d..a435c78fa35 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/wal/WALSplitter.java @@ -38,9 +38,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination; -import org.apache.hadoop.hbase.io.HeapSize; import org.apache.hadoop.hbase.master.SplitLogManager; import org.apache.hadoop.hbase.monitoring.MonitoredTask; import org.apache.hadoop.hbase.monitoring.TaskMonitor; @@ -49,12 +47,10 @@ import org.apache.hadoop.hbase.regionserver.LastSequenceId; import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.CancelableProgressable; -import org.apache.hadoop.hbase.util.ClassSize; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WAL.Entry; import org.apache.hadoop.hbase.wal.WAL.Reader; -import org.apache.hadoop.hbase.wal.WALProvider.Writer; import org.apache.hadoop.hbase.zookeeper.ZKSplitLog; import org.apache.hadoop.ipc.RemoteException; import org.apache.yetus.audience.InterfaceAudience; @@ -107,11 +103,12 @@ public class WALSplitter { // the file being split currently private FileStatus fileBeingSplit; - // if we limit the number of writers opened for sinking recovered edits - private final boolean splitWriterCreationBounded; + private final String tmpDirName; public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded"; - + public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize"; + public final static String SPLIT_WAL_WRITER_THREADS = + "hbase.regionserver.hlog.splitlog.writer.threads"; @VisibleForTesting WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS, @@ -127,20 +124,21 @@ public class WALSplitter { this.walFactory = factory; PipelineController controller = new PipelineController(); + this.tmpDirName = + conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY); - this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); - - entryBuffers = new EntryBuffers(controller, - this.conf.getLong("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024), - splitWriterCreationBounded); - - int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3); + // if we limit the number of writers opened for sinking recovered edits + boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false); + long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024); + int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3); if (splitWriterCreationBounded) { + entryBuffers = new BoundedEntryBuffers(controller, bufferSize); outputSink = - new BoundedLogWriterCreationOutputSink(this, controller, entryBuffers, numWriterThreads); + new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads); } else { + entryBuffers = new EntryBuffers(controller, bufferSize); outputSink = - new LogRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads); + new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads); } } @@ -152,6 +150,10 @@ public class WALSplitter { return fileBeingSplit; } + String getTmpDirName() { + return this.tmpDirName; + } + Map> getRegionMaxSeqIdInStores() { return regionMaxSeqIdInStores; } @@ -215,7 +217,7 @@ public class WALSplitter { int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024); Path logPath = logfile.getPath(); boolean outputSinkStarted = false; - boolean progress_failed = false; + boolean progressFailed = false; int editsCount = 0; int editsSkipped = 0; @@ -230,7 +232,7 @@ public class WALSplitter { logLength); status.setStatus("Opening log file"); if (reporter != null && !reporter.progress()) { - progress_failed = true; + progressFailed = true; return false; } logFileReader = getReader(logfile, skipErrors, reporter); @@ -288,11 +290,11 @@ public class WALSplitter { if (editsCount % interval == 0 || moreWritersFromLastCheck > numOpenedFilesBeforeReporting) { numOpenedFilesLastCheck = this.getNumOpenWriters(); - String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits())) + String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits())) + " edits, skipped " + editsSkipped + " edits."; status.setStatus("Split " + countsStr); if (reporter != null && !reporter.progress()) { - progress_failed = true; + progressFailed = true; return false; } } @@ -326,9 +328,9 @@ public class WALSplitter { try { if (outputSinkStarted) { // Set progress_failed to true as the immediate following statement will reset its value - // when finishWritingAndClose() throws exception, progress_failed has the right value - progress_failed = true; - progress_failed = outputSink.finishWritingAndClose() == null; + // when close() throws exception, progress_failed has the right value + progressFailed = true; + progressFailed = outputSink.close() == null; } } finally { long processCost = EnvironmentEdgeManager.currentTime() - startTS; @@ -337,18 +339,18 @@ public class WALSplitter { outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost + " ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" + StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() + - ", corrupted=" + isCorrupted + ", progress failed=" + progress_failed; + ", corrupted=" + isCorrupted + ", progress failed=" + progressFailed; LOG.info(msg); status.markComplete(msg); } } - return !progress_failed; + return !progressFailed; } /** * Create a new {@link Reader} for reading logs to split. */ - protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter) + private Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter) throws IOException, CorruptedLogFileException { Path path = file.getPath(); long length = file.getLen(); @@ -392,7 +394,7 @@ public class WALSplitter { return in; } - static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) + private Entry getNextLogLine(Reader in, Path path, boolean skipErrors) throws CorruptedLogFileException, IOException { try { return in.next(); @@ -475,98 +477,6 @@ public class WALSplitter { } } - /** - * A buffer of some number of edits for a given region. - * This accumulates edits and also provides a memory optimization in order to - * share a single byte array instance for the table and region name. - * Also tracks memory usage of the accumulated edits. - */ - public static class RegionEntryBuffer implements HeapSize { - long heapInBuffer = 0; - List entryBuffer; - TableName tableName; - byte[] encodedRegionName; - - RegionEntryBuffer(TableName tableName, byte[] region) { - this.tableName = tableName; - this.encodedRegionName = region; - this.entryBuffer = new ArrayList<>(); - } - - long appendEntry(Entry entry) { - internify(entry); - entryBuffer.add(entry); - long incrHeap = entry.getEdit().heapSize() + - ClassSize.align(2 * ClassSize.REFERENCE) + // WALKey pointers - 0; // TODO linkedlist entry - heapInBuffer += incrHeap; - return incrHeap; - } - - private void internify(Entry entry) { - WALKeyImpl k = entry.getKey(); - k.internTableName(this.tableName); - k.internEncodedRegionName(this.encodedRegionName); - } - - @Override - public long heapSize() { - return heapInBuffer; - } - - public byte[] getEncodedRegionName() { - return encodedRegionName; - } - - public List getEntryBuffer() { - return entryBuffer; - } - - public TableName getTableName() { - return tableName; - } - } - - /** - * Class wraps the actual writer which writes data out and related statistics - */ - public abstract static class SinkWriter { - /* Count of edits written to this path */ - long editsWritten = 0; - /* Count of edits skipped to this path */ - long editsSkipped = 0; - /* Number of nanos spent writing to this log */ - long nanosSpent = 0; - - void incrementEdits(int edits) { - editsWritten += edits; - } - - void incrementSkippedEdits(int skipped) { - editsSkipped += skipped; - } - - void incrementNanoTime(long nanos) { - nanosSpent += nanos; - } - } - - /** - * Private data structure that wraps a Writer and its Path, also collecting statistics about the - * data written to this output. - */ - final static class WriterAndPath extends SinkWriter { - final Path path; - final Writer writer; - final long minLogSeqNum; - - WriterAndPath(final Path path, final Writer writer, final long minLogSeqNum) { - this.path = path; - this.writer = writer; - this.minLogSeqNum = minLogSeqNum; - } - } - static class CorruptedLogFileException extends Exception { private static final long serialVersionUID = 1L; @@ -583,6 +493,5 @@ public class WALSplitter { CorruptedLogFileException(String message, Throwable cause) { super(message, cause); } - } } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java index 741d4492198..4d6600d524a 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALMethods.java @@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.testclassification.SmallTests; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.FSUtils; import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController; -import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer; import org.junit.ClassRule; import org.junit.Test; import org.junit.experimental.categories.Category; @@ -134,7 +133,7 @@ public class TestWALMethods { @Test public void testRegionEntryBuffer() throws Exception { - WALSplitter.RegionEntryBuffer reb = new WALSplitter.RegionEntryBuffer( + EntryBuffers.RegionEntryBuffer reb = new EntryBuffers.RegionEntryBuffer( TEST_TABLE, TEST_REGION); assertEquals(0, reb.heapSize()); @@ -153,7 +152,7 @@ public class TestWALMethods { assertTrue(sink.totalBuffered > 0); long amountInChunk = sink.totalBuffered; // Get a chunk - RegionEntryBuffer chunk = sink.getChunkToWrite(); + EntryBuffers.RegionEntryBuffer chunk = sink.getChunkToWrite(); assertEquals(chunk.heapSize(), amountInChunk); // Make sure it got marked that a thread is "working on this" @@ -172,7 +171,7 @@ public class TestWALMethods { // to get the second sink.doneWriting(chunk); - RegionEntryBuffer chunk2 = sink.getChunkToWrite(); + EntryBuffers.RegionEntryBuffer chunk2 = sink.getChunkToWrite(); assertNotNull(chunk2); assertNotSame(chunk, chunk2); long amountInChunk2 = sink.totalBuffered; diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java index cfeaac8357d..8ddd0ea96a3 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/wal/TestWALSplit.java @@ -407,15 +407,15 @@ public class TestWALSplit { WALFactory.createRecoveredEditsWriter(fs, p, conf).close(); } - private Path createRecoveredEditsPathForRegion() throws IOException{ + private Path createRecoveredEditsPathForRegion() throws IOException { byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes(); long now = System.currentTimeMillis(); - Entry entry = - new Entry(new WALKeyImpl(encoded, - TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), + Entry entry = new Entry( + new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID), new WALEdit()); - Path p = WALSplitUtil.getRegionSplitEditsPath(entry, - FILENAME_BEING_SPLIT, TMPDIRNAME, conf); + Path p = WALSplitUtil + .getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1, FILENAME_BEING_SPLIT, + TMPDIRNAME, conf); return p; }