HBASE-23298 Refactor LogRecoveredEditsOutputSink and BoundedLogWriterCreationOutputSink (#832)
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
142997c04e
commit
f7839f5eb6
|
@ -0,0 +1,270 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath;
|
||||||
|
import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
|
||||||
|
|
||||||
|
import java.io.EOFException;
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.Cell;
|
||||||
|
import org.apache.hadoop.hbase.CellUtil;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
|
||||||
|
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
abstract class AbstractRecoveredEditsOutputSink extends OutputSink {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class);
|
||||||
|
private final WALSplitter walSplitter;
|
||||||
|
private final ConcurrentMap<String, Long> regionMaximumEditLogSeqNum = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public AbstractRecoveredEditsOutputSink(WALSplitter walSplitter,
|
||||||
|
WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
|
||||||
|
super(controller, entryBuffers, numWriters);
|
||||||
|
this.walSplitter = walSplitter;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return a writer that wraps a {@link WALProvider.Writer} and its Path. Caller should close.
|
||||||
|
*/
|
||||||
|
protected RecoveredEditsWriter createRecoveredEditsWriter(TableName tableName, byte[] region,
|
||||||
|
long seqId) throws IOException {
|
||||||
|
Path regionEditsPath = getRegionSplitEditsPath(tableName, region, seqId,
|
||||||
|
walSplitter.getFileBeingSplit().getPath().getName(), walSplitter.getTmpDirName(),
|
||||||
|
walSplitter.conf);
|
||||||
|
if (walSplitter.walFS.exists(regionEditsPath)) {
|
||||||
|
LOG.warn("Found old edits file. It could be the " +
|
||||||
|
"result of a previous failed split attempt. Deleting " + regionEditsPath + ", length=" +
|
||||||
|
walSplitter.walFS.getFileStatus(regionEditsPath).getLen());
|
||||||
|
if (!walSplitter.walFS.delete(regionEditsPath, false)) {
|
||||||
|
LOG.warn("Failed delete of old {}", regionEditsPath);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
WALProvider.Writer w = walSplitter.createWriter(regionEditsPath);
|
||||||
|
LOG.info("Creating recovered edits writer path={}", regionEditsPath);
|
||||||
|
return new RecoveredEditsWriter(region, regionEditsPath, w, seqId);
|
||||||
|
}
|
||||||
|
|
||||||
|
protected Path closeRecoveredEditsWriter(RecoveredEditsWriter editsWriter,
|
||||||
|
List<IOException> thrown) throws IOException {
|
||||||
|
try {
|
||||||
|
editsWriter.writer.close();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Could not close recovered edits at {}", editsWriter.path, ioe);
|
||||||
|
thrown.add(ioe);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
LOG.info("Closed recovered edits writer path={} (wrote {} edits, skipped {} edits in {} ms",
|
||||||
|
editsWriter.path, editsWriter.editsWritten, editsWriter.editsSkipped,
|
||||||
|
editsWriter.nanosSpent / 1000 / 1000);
|
||||||
|
if (editsWriter.editsWritten == 0) {
|
||||||
|
// just remove the empty recovered.edits file
|
||||||
|
if (walSplitter.walFS.exists(editsWriter.path) &&
|
||||||
|
!walSplitter.walFS.delete(editsWriter.path, false)) {
|
||||||
|
LOG.warn("Failed deleting empty {}", editsWriter.path);
|
||||||
|
throw new IOException("Failed deleting empty " + editsWriter.path);
|
||||||
|
}
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
Path dst = getCompletedRecoveredEditsFilePath(editsWriter.path,
|
||||||
|
regionMaximumEditLogSeqNum.get(Bytes.toString(editsWriter.encodedRegionName)));
|
||||||
|
try {
|
||||||
|
if (!dst.equals(editsWriter.path) && walSplitter.walFS.exists(dst)) {
|
||||||
|
deleteOneWithFewerEntries(editsWriter, dst);
|
||||||
|
}
|
||||||
|
// Skip the unit tests which create a splitter that reads and
|
||||||
|
// writes the data without touching disk.
|
||||||
|
// TestHLogSplit#testThreading is an example.
|
||||||
|
if (walSplitter.walFS.exists(editsWriter.path)) {
|
||||||
|
if (!walSplitter.walFS.rename(editsWriter.path, dst)) {
|
||||||
|
throw new IOException(
|
||||||
|
"Failed renaming recovered edits " + editsWriter.path + " to " + dst);
|
||||||
|
}
|
||||||
|
LOG.info("Rename recovered edits {} to {}", editsWriter.path, dst);
|
||||||
|
}
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
LOG.error("Could not rename recovered edits {} to {}", editsWriter.path, dst, ioe);
|
||||||
|
thrown.add(ioe);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return dst;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean keepRegionEvent(WAL.Entry entry) {
|
||||||
|
ArrayList<Cell> cells = entry.getEdit().getCells();
|
||||||
|
for (Cell cell : cells) {
|
||||||
|
if (WALEdit.isCompactionMarker(cell)) {
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update region's maximum edit log SeqNum.
|
||||||
|
*/
|
||||||
|
void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
|
||||||
|
synchronized (regionMaximumEditLogSeqNum) {
|
||||||
|
String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
|
||||||
|
Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
|
||||||
|
if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
|
||||||
|
regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// delete the one with fewer wal entries
|
||||||
|
private void deleteOneWithFewerEntries(RecoveredEditsWriter editsWriter, Path dst)
|
||||||
|
throws IOException {
|
||||||
|
long dstMinLogSeqNum = -1L;
|
||||||
|
try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
|
||||||
|
WAL.Entry entry = reader.next();
|
||||||
|
if (entry != null) {
|
||||||
|
dstMinLogSeqNum = entry.getKey().getSequenceId();
|
||||||
|
}
|
||||||
|
} catch (EOFException e) {
|
||||||
|
LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst,
|
||||||
|
e);
|
||||||
|
}
|
||||||
|
if (editsWriter.minLogSeqNum < dstMinLogSeqNum) {
|
||||||
|
LOG.warn("Found existing old edits file. It could be the result of a previous failed" +
|
||||||
|
" split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" +
|
||||||
|
walSplitter.walFS.getFileStatus(dst).getLen());
|
||||||
|
if (!walSplitter.walFS.delete(dst, false)) {
|
||||||
|
LOG.warn("Failed deleting of old {}", dst);
|
||||||
|
throw new IOException("Failed deleting of old " + dst);
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.warn(
|
||||||
|
"Found existing old edits file and we have less entries. Deleting " + editsWriter.path +
|
||||||
|
", length=" + walSplitter.walFS.getFileStatus(editsWriter.path).getLen());
|
||||||
|
if (!walSplitter.walFS.delete(editsWriter.path, false)) {
|
||||||
|
LOG.warn("Failed deleting of {}", editsWriter.path);
|
||||||
|
throw new IOException("Failed deleting of " + editsWriter.path);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Private data structure that wraps a {@link WALProvider.Writer} and its Path, also collecting
|
||||||
|
* statistics about the data written to this output.
|
||||||
|
*/
|
||||||
|
final class RecoveredEditsWriter {
|
||||||
|
/* Count of edits written to this path */
|
||||||
|
long editsWritten = 0;
|
||||||
|
/* Count of edits skipped to this path */
|
||||||
|
long editsSkipped = 0;
|
||||||
|
/* Number of nanos spent writing to this log */
|
||||||
|
long nanosSpent = 0;
|
||||||
|
|
||||||
|
final byte[] encodedRegionName;
|
||||||
|
final Path path;
|
||||||
|
final WALProvider.Writer writer;
|
||||||
|
final long minLogSeqNum;
|
||||||
|
|
||||||
|
RecoveredEditsWriter(byte[] encodedRegionName, Path path, WALProvider.Writer writer,
|
||||||
|
long minLogSeqNum) {
|
||||||
|
this.encodedRegionName = encodedRegionName;
|
||||||
|
this.path = path;
|
||||||
|
this.writer = writer;
|
||||||
|
this.minLogSeqNum = minLogSeqNum;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void incrementEdits(int edits) {
|
||||||
|
editsWritten += edits;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void incrementSkippedEdits(int skipped) {
|
||||||
|
editsSkipped += skipped;
|
||||||
|
totalSkippedEdits.addAndGet(skipped);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void incrementNanoTime(long nanos) {
|
||||||
|
nanosSpent += nanos;
|
||||||
|
}
|
||||||
|
|
||||||
|
void writeRegionEntries(List<WAL.Entry> entries) throws IOException {
|
||||||
|
long startTime = System.nanoTime();
|
||||||
|
try {
|
||||||
|
int editsCount = 0;
|
||||||
|
for (WAL.Entry logEntry : entries) {
|
||||||
|
filterCellByStore(logEntry);
|
||||||
|
if (!logEntry.getEdit().isEmpty()) {
|
||||||
|
writer.append(logEntry);
|
||||||
|
updateRegionMaximumEditLogSeqNum(logEntry);
|
||||||
|
editsCount++;
|
||||||
|
} else {
|
||||||
|
incrementSkippedEdits(1);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Pass along summary statistics
|
||||||
|
incrementEdits(editsCount);
|
||||||
|
incrementNanoTime(System.nanoTime() - startTime);
|
||||||
|
} catch (IOException e) {
|
||||||
|
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
|
||||||
|
LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void filterCellByStore(WAL.Entry logEntry) {
|
||||||
|
Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores()
|
||||||
|
.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
|
||||||
|
if (MapUtils.isEmpty(maxSeqIdInStores)) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Create the array list for the cells that aren't filtered.
|
||||||
|
// We make the assumption that most cells will be kept.
|
||||||
|
ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
|
||||||
|
for (Cell cell : logEntry.getEdit().getCells()) {
|
||||||
|
if (WALEdit.isMetaEditFamily(cell)) {
|
||||||
|
keptCells.add(cell);
|
||||||
|
} else {
|
||||||
|
byte[] family = CellUtil.cloneFamily(cell);
|
||||||
|
Long maxSeqId = maxSeqIdInStores.get(family);
|
||||||
|
// Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
|
||||||
|
// or the master was crashed before and we can not get the information.
|
||||||
|
if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
|
||||||
|
keptCells.add(cell);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Anything in the keptCells array list is still live.
|
||||||
|
// So rather than removing the cells from the array list
|
||||||
|
// which would be an O(n^2) operation, we just replace the list
|
||||||
|
logEntry.getEdit().setCells(keptCells);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,44 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used for {@link BoundedRecoveredEditsOutputSink}. The core part of limiting opening writers is it
|
||||||
|
* doesn't return chunk only if the heap size is over maxHeapUsage. Thus it doesn't need to create
|
||||||
|
* a writer for each region during splitting. The returned {@link EntryBuffers.RegionEntryBuffer}
|
||||||
|
* will be write to recovered edits file and close the writer immediately.
|
||||||
|
* See {@link BoundedRecoveredEditsOutputSink#append(EntryBuffers.RegionEntryBuffer)} for more
|
||||||
|
* details.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class BoundedEntryBuffers extends EntryBuffers {
|
||||||
|
|
||||||
|
public BoundedEntryBuffers(WALSplitter.PipelineController controller, long maxHeapUsage) {
|
||||||
|
super(controller, maxHeapUsage);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
synchronized RegionEntryBuffer getChunkToWrite() {
|
||||||
|
if (totalBuffered < maxHeapUsage) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return super.getChunkToWrite();
|
||||||
|
}
|
||||||
|
}
|
|
@ -1,150 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.wal;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.CompletionService;
|
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.io.MultipleIOException;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Class that manages the output streams from the log splitting process.
|
|
||||||
* Bounded means the output streams will be no more than the size of threadpool
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class BoundedLogWriterCreationOutputSink extends LogRecoveredEditsOutputSink {
|
|
||||||
private static final Logger LOG =
|
|
||||||
LoggerFactory.getLogger(BoundedLogWriterCreationOutputSink.class);
|
|
||||||
|
|
||||||
private ConcurrentHashMap<String, Long> regionRecoverStatMap = new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
public BoundedLogWriterCreationOutputSink(WALSplitter walSplitter,
|
|
||||||
WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
|
|
||||||
super(walSplitter, controller, entryBuffers, numWriters);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public List<Path> finishWritingAndClose() throws IOException {
|
|
||||||
boolean isSuccessful;
|
|
||||||
List<Path> result;
|
|
||||||
try {
|
|
||||||
isSuccessful = finishWriting(false);
|
|
||||||
} finally {
|
|
||||||
result = close();
|
|
||||||
}
|
|
||||||
if (isSuccessful) {
|
|
||||||
splits = result;
|
|
||||||
}
|
|
||||||
return splits;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown,
|
|
||||||
List<Path> paths) throws InterruptedException, ExecutionException {
|
|
||||||
for (final Map.Entry<byte[], WALSplitter.RegionEntryBuffer> buffer : entryBuffers.buffers
|
|
||||||
.entrySet()) {
|
|
||||||
LOG.info("Submitting writeThenClose of {}",
|
|
||||||
Bytes.toString(buffer.getValue().encodedRegionName));
|
|
||||||
completionService.submit(new Callable<Void>() {
|
|
||||||
@Override
|
|
||||||
public Void call() throws Exception {
|
|
||||||
Path dst = writeThenClose(buffer.getValue());
|
|
||||||
paths.add(dst);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
boolean progress_failed = false;
|
|
||||||
for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
|
|
||||||
Future<Void> future = completionService.take();
|
|
||||||
future.get();
|
|
||||||
if (!progress_failed && reporter != null && !reporter.progress()) {
|
|
||||||
progress_failed = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return progress_failed;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* since the splitting process may create multiple output files, we need a map
|
|
||||||
* regionRecoverStatMap to track the output count of each region.
|
|
||||||
* @return a map from encoded region ID to the number of edits written out for that region.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public Map<byte[], Long> getOutputCounts() {
|
|
||||||
Map<byte[], Long> regionRecoverStatMapResult = new HashMap<>();
|
|
||||||
for (Map.Entry<String, Long> entry : regionRecoverStatMap.entrySet()) {
|
|
||||||
regionRecoverStatMapResult.put(Bytes.toBytes(entry.getKey()), entry.getValue());
|
|
||||||
}
|
|
||||||
return regionRecoverStatMapResult;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return the number of recovered regions
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public int getNumberOfRecoveredRegions() {
|
|
||||||
return regionRecoverStatMap.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Append the buffer to a new recovered edits file, then close it after all done
|
|
||||||
* @param buffer contain all entries of a certain region
|
|
||||||
* @throws IOException when closeWriter failed
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
|
|
||||||
writeThenClose(buffer);
|
|
||||||
}
|
|
||||||
|
|
||||||
private Path writeThenClose(WALSplitter.RegionEntryBuffer buffer) throws IOException {
|
|
||||||
WALSplitter.WriterAndPath wap = appendBuffer(buffer, false);
|
|
||||||
if (wap != null) {
|
|
||||||
String encodedRegionName = Bytes.toString(buffer.encodedRegionName);
|
|
||||||
Long value = regionRecoverStatMap.putIfAbsent(encodedRegionName, wap.editsWritten);
|
|
||||||
if (value != null) {
|
|
||||||
Long newValue = regionRecoverStatMap.get(encodedRegionName) + wap.editsWritten;
|
|
||||||
regionRecoverStatMap.put(encodedRegionName, newValue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Path dst = null;
|
|
||||||
List<IOException> thrown = new ArrayList<>();
|
|
||||||
if (wap != null) {
|
|
||||||
dst = closeWriter(Bytes.toString(buffer.encodedRegionName), wap, thrown);
|
|
||||||
}
|
|
||||||
if (!thrown.isEmpty()) {
|
|
||||||
throw MultipleIOException.createIOException(thrown);
|
|
||||||
}
|
|
||||||
return dst;
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -0,0 +1,141 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.io.MultipleIOException;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class that manages the output streams from the log splitting process.
|
||||||
|
* Every region may have many recovered edits file. But the opening writers is bounded.
|
||||||
|
* Bounded means the output streams will be no more than the size of threadpool.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class BoundedRecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
|
||||||
|
private static final Logger LOG =
|
||||||
|
LoggerFactory.getLogger(BoundedRecoveredEditsOutputSink.class);
|
||||||
|
|
||||||
|
// Since the splitting process may create multiple output files, we need a map
|
||||||
|
// to track the output count of each region.
|
||||||
|
private ConcurrentHashMap<byte[], Long> regionEditsWrittenMap = new ConcurrentHashMap<>();
|
||||||
|
// Need a counter to track the opening writers.
|
||||||
|
private final AtomicInteger openingWritersNum = new AtomicInteger(0);
|
||||||
|
|
||||||
|
public BoundedRecoveredEditsOutputSink(WALSplitter walSplitter,
|
||||||
|
WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
|
||||||
|
super(walSplitter, controller, entryBuffers, numWriters);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
|
||||||
|
List<WAL.Entry> entries = buffer.entryBuffer;
|
||||||
|
if (entries.isEmpty()) {
|
||||||
|
LOG.warn("got an empty buffer, skipping");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// The key point is create a new writer, write edits then close writer.
|
||||||
|
RecoveredEditsWriter writer =
|
||||||
|
createRecoveredEditsWriter(buffer.tableName, buffer.encodedRegionName,
|
||||||
|
entries.get(0).getKey().getSequenceId());
|
||||||
|
if (writer != null) {
|
||||||
|
openingWritersNum.incrementAndGet();
|
||||||
|
writer.writeRegionEntries(entries);
|
||||||
|
regionEditsWrittenMap.compute(buffer.encodedRegionName,
|
||||||
|
(k, v) -> v == null ? writer.editsWritten : v + writer.editsWritten);
|
||||||
|
List<IOException> thrown = new ArrayList<>();
|
||||||
|
Path dst = closeRecoveredEditsWriter(writer, thrown);
|
||||||
|
splits.add(dst);
|
||||||
|
openingWritersNum.decrementAndGet();
|
||||||
|
if (!thrown.isEmpty()) {
|
||||||
|
throw MultipleIOException.createIOException(thrown);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Path> close() throws IOException {
|
||||||
|
boolean isSuccessful = true;
|
||||||
|
try {
|
||||||
|
isSuccessful &= finishWriterThreads();
|
||||||
|
} finally {
|
||||||
|
isSuccessful &= writeRemainingEntryBuffers();
|
||||||
|
}
|
||||||
|
return isSuccessful ? splits : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Write out the remaining RegionEntryBuffers and close the writers.
|
||||||
|
*
|
||||||
|
* @return true when there is no error.
|
||||||
|
*/
|
||||||
|
private boolean writeRemainingEntryBuffers() throws IOException {
|
||||||
|
for (EntryBuffers.RegionEntryBuffer buffer : entryBuffers.buffers.values()) {
|
||||||
|
closeCompletionService.submit(() -> {
|
||||||
|
append(buffer);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
boolean progressFailed = false;
|
||||||
|
try {
|
||||||
|
for (int i = 0, n = entryBuffers.buffers.size(); i < n; i++) {
|
||||||
|
Future<Void> future = closeCompletionService.take();
|
||||||
|
future.get();
|
||||||
|
if (!progressFailed && reporter != null && !reporter.progress()) {
|
||||||
|
progressFailed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
IOException iie = new InterruptedIOException();
|
||||||
|
iie.initCause(e);
|
||||||
|
throw iie;
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
throw new IOException(e.getCause());
|
||||||
|
} finally {
|
||||||
|
closeThreadPool.shutdownNow();
|
||||||
|
}
|
||||||
|
return !progressFailed;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<byte[], Long> getOutputCounts() {
|
||||||
|
return regionEditsWrittenMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getNumberOfRecoveredRegions() {
|
||||||
|
return regionEditsWrittenMap.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
int getNumOpenWriters() {
|
||||||
|
return openingWritersNum.get();
|
||||||
|
}
|
||||||
|
}
|
|
@ -18,58 +18,56 @@
|
||||||
package org.apache.hadoop.hbase.wal;
|
package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.TreeMap;
|
import java.util.TreeMap;
|
||||||
import java.util.TreeSet;
|
import java.util.TreeSet;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.io.HeapSize;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.ClassSize;
|
||||||
import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
|
import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
|
||||||
import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Class which accumulates edits and separates them into a buffer per region while simultaneously
|
* Class which accumulates edits and separates them into a buffer per region while simultaneously
|
||||||
* accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then
|
* accounting RAM usage. Blocks if the RAM usage crosses a predefined threshold. Writer threads then
|
||||||
* pull region-specific buffers from this class.
|
* pull region-specific buffers from this class.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class EntryBuffers {
|
class EntryBuffers {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class);
|
private static final Logger LOG = LoggerFactory.getLogger(EntryBuffers.class);
|
||||||
|
|
||||||
PipelineController controller;
|
private final PipelineController controller;
|
||||||
|
|
||||||
Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
final Map<byte[], RegionEntryBuffer> buffers = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Track which regions are currently in the middle of writing. We don't allow an IO thread to pick
|
* Track which regions are currently in the middle of writing. We don't allow an IO thread to pick
|
||||||
* up bytes from a region if we're already writing data for that region in a different IO thread.
|
* up bytes from a region if we're already writing data for that region in a different IO thread.
|
||||||
*/
|
*/
|
||||||
Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
|
private final Set<byte[]> currentlyWriting = new TreeSet<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
|
||||||
long totalBuffered = 0;
|
protected long totalBuffered = 0;
|
||||||
long maxHeapUsage;
|
protected final long maxHeapUsage;
|
||||||
boolean splitWriterCreationBounded;
|
|
||||||
|
|
||||||
public EntryBuffers(PipelineController controller, long maxHeapUsage) {
|
public EntryBuffers(PipelineController controller, long maxHeapUsage) {
|
||||||
this(controller, maxHeapUsage, false);
|
|
||||||
}
|
|
||||||
|
|
||||||
public EntryBuffers(PipelineController controller, long maxHeapUsage,
|
|
||||||
boolean splitWriterCreationBounded) {
|
|
||||||
this.controller = controller;
|
this.controller = controller;
|
||||||
this.maxHeapUsage = maxHeapUsage;
|
this.maxHeapUsage = maxHeapUsage;
|
||||||
this.splitWriterCreationBounded = splitWriterCreationBounded;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Append a log entry into the corresponding region buffer. Blocks if the total heap usage has
|
* Append a log entry into the corresponding region buffer. Blocks if the total heap usage has
|
||||||
* crossed the specified threshold.
|
* crossed the specified threshold.
|
||||||
*/
|
*/
|
||||||
public void appendEntry(WAL.Entry entry) throws InterruptedException, IOException {
|
void appendEntry(WAL.Entry entry) throws InterruptedException, IOException {
|
||||||
WALKey key = entry.getKey();
|
WALKey key = entry.getKey();
|
||||||
RegionEntryBuffer buffer;
|
RegionEntryBuffer buffer;
|
||||||
long incrHeap;
|
long incrHeap;
|
||||||
|
@ -98,13 +96,6 @@ public class EntryBuffers {
|
||||||
* @return RegionEntryBuffer a buffer of edits to be written.
|
* @return RegionEntryBuffer a buffer of edits to be written.
|
||||||
*/
|
*/
|
||||||
synchronized RegionEntryBuffer getChunkToWrite() {
|
synchronized RegionEntryBuffer getChunkToWrite() {
|
||||||
// The core part of limiting opening writers is it doesn't return chunk only if the
|
|
||||||
// heap size is over maxHeapUsage. Thus it doesn't need to create a writer for each
|
|
||||||
// region during splitting. It will flush all the logs in the buffer after splitting
|
|
||||||
// through a threadpool, which means the number of writers it created is under control.
|
|
||||||
if (splitWriterCreationBounded && totalBuffered < maxHeapUsage) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
long biggestSize = 0;
|
long biggestSize = 0;
|
||||||
byte[] biggestBufferKey = null;
|
byte[] biggestBufferKey = null;
|
||||||
|
|
||||||
|
@ -138,21 +129,56 @@ public class EntryBuffers {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
synchronized boolean isRegionCurrentlyWriting(byte[] region) {
|
synchronized boolean isRegionCurrentlyWriting(byte[] region) {
|
||||||
return currentlyWriting.contains(region);
|
return currentlyWriting.contains(region);
|
||||||
}
|
}
|
||||||
|
|
||||||
public void waitUntilDrained() {
|
/**
|
||||||
synchronized (controller.dataAvailable) {
|
* A buffer of some number of edits for a given region.
|
||||||
while (totalBuffered > 0) {
|
* This accumulates edits and also provides a memory optimization in order to
|
||||||
try {
|
* share a single byte array instance for the table and region name.
|
||||||
controller.dataAvailable.wait(2000);
|
* Also tracks memory usage of the accumulated edits.
|
||||||
} catch (InterruptedException e) {
|
*/
|
||||||
LOG.warn("Got interrupted while waiting for EntryBuffers is drained");
|
static class RegionEntryBuffer implements HeapSize {
|
||||||
Thread.interrupted();
|
private long heapInBuffer = 0;
|
||||||
break;
|
final List<WAL.Entry> entryBuffer;
|
||||||
}
|
final TableName tableName;
|
||||||
}
|
final byte[] encodedRegionName;
|
||||||
|
|
||||||
|
RegionEntryBuffer(TableName tableName, byte[] region) {
|
||||||
|
this.tableName = tableName;
|
||||||
|
this.encodedRegionName = region;
|
||||||
|
this.entryBuffer = new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
long appendEntry(WAL.Entry entry) {
|
||||||
|
internify(entry);
|
||||||
|
entryBuffer.add(entry);
|
||||||
|
// TODO linkedlist entry
|
||||||
|
long incrHeap = entry.getEdit().heapSize() +
|
||||||
|
ClassSize.align(2 * ClassSize.REFERENCE); // WALKey pointers
|
||||||
|
heapInBuffer += incrHeap;
|
||||||
|
return incrHeap;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void internify(WAL.Entry entry) {
|
||||||
|
WALKeyImpl k = entry.getKey();
|
||||||
|
k.internTableName(this.tableName);
|
||||||
|
k.internEncodedRegionName(this.encodedRegionName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long heapSize() {
|
||||||
|
return heapInBuffer;
|
||||||
|
}
|
||||||
|
|
||||||
|
public byte[] getEncodedRegionName() {
|
||||||
|
return encodedRegionName;
|
||||||
|
}
|
||||||
|
|
||||||
|
public TableName getTableName() {
|
||||||
|
return tableName;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
|
@ -1,460 +0,0 @@
|
||||||
/**
|
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
|
||||||
* or more contributor license agreements. See the NOTICE file
|
|
||||||
* distributed with this work for additional information
|
|
||||||
* regarding copyright ownership. The ASF licenses this file
|
|
||||||
* to you under the Apache License, Version 2.0 (the
|
|
||||||
* "License"); you may not use this file except in compliance
|
|
||||||
* with the License. You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
package org.apache.hadoop.hbase.wal;
|
|
||||||
|
|
||||||
import static org.apache.hadoop.hbase.wal.WALSplitUtil.getCompletedRecoveredEditsFilePath;
|
|
||||||
import static org.apache.hadoop.hbase.wal.WALSplitUtil.getRegionSplitEditsPath;
|
|
||||||
|
|
||||||
import java.io.EOFException;
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.io.InterruptedIOException;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.TreeMap;
|
|
||||||
import java.util.concurrent.Callable;
|
|
||||||
import java.util.concurrent.CompletionService;
|
|
||||||
import java.util.concurrent.ExecutionException;
|
|
||||||
import java.util.concurrent.ExecutorCompletionService;
|
|
||||||
import java.util.concurrent.Future;
|
|
||||||
import java.util.concurrent.ThreadFactory;
|
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
|
||||||
import java.util.concurrent.TimeUnit;
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.fs.FileSystem;
|
|
||||||
import org.apache.hadoop.fs.Path;
|
|
||||||
import org.apache.hadoop.hbase.Cell;
|
|
||||||
import org.apache.hadoop.hbase.CellUtil;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.log.HBaseMarkers;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
|
||||||
import org.apache.hadoop.io.MultipleIOException;
|
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
|
||||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.CollectionUtils;
|
|
||||||
import org.apache.hbase.thirdparty.org.apache.commons.collections4.MapUtils;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Class that manages the output streams from the log splitting process.
|
|
||||||
*/
|
|
||||||
@InterfaceAudience.Private
|
|
||||||
public class LogRecoveredEditsOutputSink extends OutputSink {
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(LogRecoveredEditsOutputSink.class);
|
|
||||||
private WALSplitter walSplitter;
|
|
||||||
private FileSystem walFS;
|
|
||||||
private Configuration conf;
|
|
||||||
|
|
||||||
public LogRecoveredEditsOutputSink(WALSplitter walSplitter,
|
|
||||||
WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
|
|
||||||
// More threads could potentially write faster at the expense
|
|
||||||
// of causing more disk seeks as the logs are split.
|
|
||||||
// 3. After a certain setting (probably around 3) the
|
|
||||||
// process will be bound on the reader in the current
|
|
||||||
// implementation anyway.
|
|
||||||
super(controller, entryBuffers, numWriters);
|
|
||||||
this.walSplitter = walSplitter;
|
|
||||||
this.walFS = walSplitter.walFS;
|
|
||||||
this.conf = walSplitter.conf;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return null if failed to report progress
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public List<Path> finishWritingAndClose() throws IOException {
|
|
||||||
boolean isSuccessful = false;
|
|
||||||
List<Path> result = null;
|
|
||||||
try {
|
|
||||||
isSuccessful = finishWriting(false);
|
|
||||||
} finally {
|
|
||||||
result = close();
|
|
||||||
List<IOException> thrown = closeLogWriters(null);
|
|
||||||
if (CollectionUtils.isNotEmpty(thrown)) {
|
|
||||||
throw MultipleIOException.createIOException(thrown);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (isSuccessful) {
|
|
||||||
splits = result;
|
|
||||||
}
|
|
||||||
return splits;
|
|
||||||
}
|
|
||||||
|
|
||||||
// delete the one with fewer wal entries
|
|
||||||
private void deleteOneWithFewerEntries(WALSplitter.WriterAndPath wap, Path dst)
|
|
||||||
throws IOException {
|
|
||||||
long dstMinLogSeqNum = -1L;
|
|
||||||
try (WAL.Reader reader = walSplitter.getWalFactory().createReader(walSplitter.walFS, dst)) {
|
|
||||||
WAL.Entry entry = reader.next();
|
|
||||||
if (entry != null) {
|
|
||||||
dstMinLogSeqNum = entry.getKey().getSequenceId();
|
|
||||||
}
|
|
||||||
} catch (EOFException e) {
|
|
||||||
LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?", dst,
|
|
||||||
e);
|
|
||||||
}
|
|
||||||
if (wap.minLogSeqNum < dstMinLogSeqNum) {
|
|
||||||
LOG.warn("Found existing old edits file. It could be the result of a previous failed"
|
|
||||||
+ " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
|
|
||||||
+ walFS.getFileStatus(dst).getLen());
|
|
||||||
if (!walFS.delete(dst, false)) {
|
|
||||||
LOG.warn("Failed deleting of old {}", dst);
|
|
||||||
throw new IOException("Failed deleting of old " + dst);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.path
|
|
||||||
+ ", length=" + walFS.getFileStatus(wap.path).getLen());
|
|
||||||
if (!walFS.delete(wap.path, false)) {
|
|
||||||
LOG.warn("Failed deleting of {}", wap.path);
|
|
||||||
throw new IOException("Failed deleting of " + wap.path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Close all of the output streams.
|
|
||||||
* @return the list of paths written.
|
|
||||||
*/
|
|
||||||
List<Path> close() throws IOException {
|
|
||||||
Preconditions.checkState(!closeAndCleanCompleted);
|
|
||||||
|
|
||||||
final List<Path> paths = new ArrayList<>();
|
|
||||||
final List<IOException> thrown = Lists.newArrayList();
|
|
||||||
ThreadPoolExecutor closeThreadPool =
|
|
||||||
Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS, new ThreadFactory() {
|
|
||||||
private int count = 1;
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Thread newThread(Runnable r) {
|
|
||||||
Thread t = new Thread(r, "split-log-closeStream-" + count++);
|
|
||||||
return t;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
CompletionService<Void> completionService = new ExecutorCompletionService<>(closeThreadPool);
|
|
||||||
boolean progress_failed;
|
|
||||||
try {
|
|
||||||
progress_failed = executeCloseTask(completionService, thrown, paths);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
IOException iie = new InterruptedIOException();
|
|
||||||
iie.initCause(e);
|
|
||||||
throw iie;
|
|
||||||
} catch (ExecutionException e) {
|
|
||||||
throw new IOException(e.getCause());
|
|
||||||
} finally {
|
|
||||||
closeThreadPool.shutdownNow();
|
|
||||||
}
|
|
||||||
if (!thrown.isEmpty()) {
|
|
||||||
throw MultipleIOException.createIOException(thrown);
|
|
||||||
}
|
|
||||||
writersClosed = true;
|
|
||||||
closeAndCleanCompleted = true;
|
|
||||||
if (progress_failed) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return paths;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @param completionService threadPool to execute the closing tasks
|
|
||||||
* @param thrown store the exceptions
|
|
||||||
* @param paths arrayList to store the paths written
|
|
||||||
* @return if close tasks executed successful
|
|
||||||
*/
|
|
||||||
boolean executeCloseTask(CompletionService<Void> completionService, List<IOException> thrown,
|
|
||||||
List<Path> paths) throws InterruptedException, ExecutionException {
|
|
||||||
for (final Map.Entry<String, WALSplitter.SinkWriter> writersEntry : writers.entrySet()) {
|
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
LOG.trace(
|
|
||||||
"Submitting close of " + ((WALSplitter.WriterAndPath) writersEntry.getValue()).path);
|
|
||||||
}
|
|
||||||
completionService.submit(new Callable<Void>() {
|
|
||||||
@Override
|
|
||||||
public Void call() throws Exception {
|
|
||||||
WALSplitter.WriterAndPath wap = (WALSplitter.WriterAndPath) writersEntry.getValue();
|
|
||||||
Path dst = closeWriter(writersEntry.getKey(), wap, thrown);
|
|
||||||
paths.add(dst);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
|
||||||
boolean progress_failed = false;
|
|
||||||
for (int i = 0, n = this.writers.size(); i < n; i++) {
|
|
||||||
Future<Void> future = completionService.take();
|
|
||||||
future.get();
|
|
||||||
if (!progress_failed && reporter != null && !reporter.progress()) {
|
|
||||||
progress_failed = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return progress_failed;
|
|
||||||
}
|
|
||||||
|
|
||||||
Path closeWriter(String encodedRegionName, WALSplitter.WriterAndPath wap,
|
|
||||||
List<IOException> thrown) throws IOException {
|
|
||||||
LOG.trace("Closing {}", wap.path);
|
|
||||||
try {
|
|
||||||
wap.writer.close();
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.error("Could not close log at {}", wap.path, ioe);
|
|
||||||
thrown.add(ioe);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
if (LOG.isDebugEnabled()) {
|
|
||||||
LOG.debug("Closed wap " + wap.path + " (wrote " + wap.editsWritten + " edits, skipped "
|
|
||||||
+ wap.editsSkipped + " edits in " + (wap.nanosSpent / 1000 / 1000) + "ms");
|
|
||||||
}
|
|
||||||
if (wap.editsWritten == 0) {
|
|
||||||
// just remove the empty recovered.edits file
|
|
||||||
if (walFS.exists(wap.path) && !walFS.delete(wap.path, false)) {
|
|
||||||
LOG.warn("Failed deleting empty {}", wap.path);
|
|
||||||
throw new IOException("Failed deleting empty " + wap.path);
|
|
||||||
}
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
Path dst = getCompletedRecoveredEditsFilePath(wap.path,
|
|
||||||
regionMaximumEditLogSeqNum.get(encodedRegionName));
|
|
||||||
try {
|
|
||||||
if (!dst.equals(wap.path) && walFS.exists(dst)) {
|
|
||||||
deleteOneWithFewerEntries(wap, dst);
|
|
||||||
}
|
|
||||||
// Skip the unit tests which create a splitter that reads and
|
|
||||||
// writes the data without touching disk.
|
|
||||||
// TestHLogSplit#testThreading is an example.
|
|
||||||
if (walFS.exists(wap.path)) {
|
|
||||||
if (!walFS.rename(wap.path, dst)) {
|
|
||||||
throw new IOException("Failed renaming " + wap.path + " to " + dst);
|
|
||||||
}
|
|
||||||
LOG.info("Rename {} to {}", wap.path, dst);
|
|
||||||
}
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.error("Could not rename {} to {}", wap.path, dst, ioe);
|
|
||||||
thrown.add(ioe);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
return dst;
|
|
||||||
}
|
|
||||||
|
|
||||||
private List<IOException> closeLogWriters(List<IOException> thrown) throws IOException {
|
|
||||||
if (writersClosed) {
|
|
||||||
return thrown;
|
|
||||||
}
|
|
||||||
if (thrown == null) {
|
|
||||||
thrown = Lists.newArrayList();
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
for (WriterThread writerThread : writerThreads) {
|
|
||||||
while (writerThread.isAlive()) {
|
|
||||||
writerThread.setShouldStop(true);
|
|
||||||
writerThread.interrupt();
|
|
||||||
try {
|
|
||||||
writerThread.join(10);
|
|
||||||
} catch (InterruptedException e) {
|
|
||||||
IOException iie = new InterruptedIOException();
|
|
||||||
iie.initCause(e);
|
|
||||||
throw iie;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} finally {
|
|
||||||
WALSplitter.WriterAndPath wap = null;
|
|
||||||
for (WALSplitter.SinkWriter tmpWAP : writers.values()) {
|
|
||||||
try {
|
|
||||||
wap = (WALSplitter.WriterAndPath) tmpWAP;
|
|
||||||
wap.writer.close();
|
|
||||||
} catch (IOException ioe) {
|
|
||||||
LOG.error("Couldn't close log at {}", wap.path, ioe);
|
|
||||||
thrown.add(ioe);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
LOG.info("Closed log " + wap.path + " (wrote " + wap.editsWritten + " edits in "
|
|
||||||
+ (wap.nanosSpent / 1000 / 1000) + "ms)");
|
|
||||||
}
|
|
||||||
writersClosed = true;
|
|
||||||
}
|
|
||||||
|
|
||||||
return thrown;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Get a writer and path for a log starting at the given entry. This function is threadsafe so
|
|
||||||
* long as multiple threads are always acting on different regions.
|
|
||||||
* @return null if this region shouldn't output any logs
|
|
||||||
*/
|
|
||||||
WALSplitter.WriterAndPath getWriterAndPath(WAL.Entry entry, boolean reusable) throws IOException {
|
|
||||||
byte[] region = entry.getKey().getEncodedRegionName();
|
|
||||||
String regionName = Bytes.toString(region);
|
|
||||||
WALSplitter.WriterAndPath ret = (WALSplitter.WriterAndPath) writers.get(regionName);
|
|
||||||
if (ret != null) {
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
// If we already decided that this region doesn't get any output
|
|
||||||
// we don't need to check again.
|
|
||||||
if (blacklistedRegions.contains(region)) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
ret = createWAP(region, entry);
|
|
||||||
if (ret == null) {
|
|
||||||
blacklistedRegions.add(region);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
if (reusable) {
|
|
||||||
writers.put(regionName, ret);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return a path with a write for that path. caller should close.
|
|
||||||
*/
|
|
||||||
WALSplitter.WriterAndPath createWAP(byte[] region, WAL.Entry entry) throws IOException {
|
|
||||||
String tmpDirName = walSplitter.conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY,
|
|
||||||
HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
|
|
||||||
Path regionedits = getRegionSplitEditsPath(entry,
|
|
||||||
walSplitter.getFileBeingSplit().getPath().getName(), tmpDirName, conf);
|
|
||||||
if (regionedits == null) {
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
FileSystem walFs = FSUtils.getWALFileSystem(conf);
|
|
||||||
if (walFs.exists(regionedits)) {
|
|
||||||
LOG.warn("Found old edits file. It could be the "
|
|
||||||
+ "result of a previous failed split attempt. Deleting " + regionedits + ", length="
|
|
||||||
+ walFs.getFileStatus(regionedits).getLen());
|
|
||||||
if (!walFs.delete(regionedits, false)) {
|
|
||||||
LOG.warn("Failed delete of old {}", regionedits);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
WALProvider.Writer w = walSplitter.createWriter(regionedits);
|
|
||||||
LOG.debug("Creating writer path={}", regionedits);
|
|
||||||
return new WALSplitter.WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
void filterCellByStore(WAL.Entry logEntry) {
|
|
||||||
Map<byte[], Long> maxSeqIdInStores = walSplitter.getRegionMaxSeqIdInStores()
|
|
||||||
.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
|
|
||||||
if (MapUtils.isEmpty(maxSeqIdInStores)) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
// Create the array list for the cells that aren't filtered.
|
|
||||||
// We make the assumption that most cells will be kept.
|
|
||||||
ArrayList<Cell> keptCells = new ArrayList<>(logEntry.getEdit().getCells().size());
|
|
||||||
for (Cell cell : logEntry.getEdit().getCells()) {
|
|
||||||
if (WALEdit.isMetaEditFamily(cell)) {
|
|
||||||
keptCells.add(cell);
|
|
||||||
} else {
|
|
||||||
byte[] family = CellUtil.cloneFamily(cell);
|
|
||||||
Long maxSeqId = maxSeqIdInStores.get(family);
|
|
||||||
// Do not skip cell even if maxSeqId is null. Maybe we are in a rolling upgrade,
|
|
||||||
// or the master was crashed before and we can not get the information.
|
|
||||||
if (maxSeqId == null || maxSeqId.longValue() < logEntry.getKey().getSequenceId()) {
|
|
||||||
keptCells.add(cell);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Anything in the keptCells array list is still live.
|
|
||||||
// So rather than removing the cells from the array list
|
|
||||||
// which would be an O(n^2) operation, we just replace the list
|
|
||||||
logEntry.getEdit().setCells(keptCells);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void append(WALSplitter.RegionEntryBuffer buffer) throws IOException {
|
|
||||||
appendBuffer(buffer, true);
|
|
||||||
}
|
|
||||||
|
|
||||||
WALSplitter.WriterAndPath appendBuffer(WALSplitter.RegionEntryBuffer buffer, boolean reusable)
|
|
||||||
throws IOException {
|
|
||||||
List<WAL.Entry> entries = buffer.entryBuffer;
|
|
||||||
if (entries.isEmpty()) {
|
|
||||||
LOG.warn("got an empty buffer, skipping");
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
|
|
||||||
WALSplitter.WriterAndPath wap = null;
|
|
||||||
|
|
||||||
long startTime = System.nanoTime();
|
|
||||||
try {
|
|
||||||
int editsCount = 0;
|
|
||||||
|
|
||||||
for (WAL.Entry logEntry : entries) {
|
|
||||||
if (wap == null) {
|
|
||||||
wap = getWriterAndPath(logEntry, reusable);
|
|
||||||
if (wap == null) {
|
|
||||||
// This log spews the full edit. Can be massive in the log. Enable only debugging
|
|
||||||
// WAL lost edit issues.
|
|
||||||
LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
|
|
||||||
return null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
filterCellByStore(logEntry);
|
|
||||||
if (!logEntry.getEdit().isEmpty()) {
|
|
||||||
wap.writer.append(logEntry);
|
|
||||||
this.updateRegionMaximumEditLogSeqNum(logEntry);
|
|
||||||
editsCount++;
|
|
||||||
} else {
|
|
||||||
wap.incrementSkippedEdits(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// Pass along summary statistics
|
|
||||||
wap.incrementEdits(editsCount);
|
|
||||||
wap.incrementNanoTime(System.nanoTime() - startTime);
|
|
||||||
} catch (IOException e) {
|
|
||||||
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
|
|
||||||
LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
|
|
||||||
throw e;
|
|
||||||
}
|
|
||||||
return wap;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean keepRegionEvent(WAL.Entry entry) {
|
|
||||||
ArrayList<Cell> cells = entry.getEdit().getCells();
|
|
||||||
for (Cell cell : cells) {
|
|
||||||
if (WALEdit.isCompactionMarker(cell)) {
|
|
||||||
return true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return a map from encoded region ID to the number of edits written out for that region.
|
|
||||||
*/
|
|
||||||
@Override
|
|
||||||
public Map<byte[], Long> getOutputCounts() {
|
|
||||||
TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
|
||||||
for (Map.Entry<String, WALSplitter.SinkWriter> entry : writers.entrySet()) {
|
|
||||||
ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
|
|
||||||
}
|
|
||||||
return ret;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public int getNumberOfRecoveredRegions() {
|
|
||||||
return writers.size();
|
|
||||||
}
|
|
||||||
}
|
|
|
@ -19,17 +19,18 @@ package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.util.Collections;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.concurrent.CompletionService;
|
||||||
import java.util.TreeSet;
|
import java.util.concurrent.ExecutorCompletionService;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -41,39 +42,36 @@ import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
* ways of consuming recovered edits.
|
* ways of consuming recovered edits.
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public abstract class OutputSink {
|
abstract class OutputSink {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class);
|
private static final Logger LOG = LoggerFactory.getLogger(OutputSink.class);
|
||||||
|
|
||||||
protected WALSplitter.PipelineController controller;
|
private final WALSplitter.PipelineController controller;
|
||||||
protected EntryBuffers entryBuffers;
|
protected final EntryBuffers entryBuffers;
|
||||||
|
|
||||||
protected ConcurrentHashMap<String, WALSplitter.SinkWriter> writers = new ConcurrentHashMap<>();
|
private final List<WriterThread> writerThreads = Lists.newArrayList();
|
||||||
protected final ConcurrentHashMap<String, Long> regionMaximumEditLogSeqNum =
|
|
||||||
new ConcurrentHashMap<>();
|
|
||||||
|
|
||||||
protected final List<WriterThread> writerThreads = Lists.newArrayList();
|
|
||||||
|
|
||||||
/* Set of regions which we've decided should not output edits */
|
|
||||||
protected final Set<byte[]> blacklistedRegions =
|
|
||||||
Collections.synchronizedSet(new TreeSet<>(Bytes.BYTES_COMPARATOR));
|
|
||||||
|
|
||||||
protected boolean closeAndCleanCompleted = false;
|
|
||||||
|
|
||||||
protected boolean writersClosed = false;
|
|
||||||
|
|
||||||
protected final int numThreads;
|
protected final int numThreads;
|
||||||
|
|
||||||
protected CancelableProgressable reporter = null;
|
protected CancelableProgressable reporter = null;
|
||||||
|
|
||||||
protected AtomicLong skippedEdits = new AtomicLong();
|
protected final AtomicLong totalSkippedEdits = new AtomicLong();
|
||||||
|
|
||||||
protected List<Path> splits = null;
|
protected final List<Path> splits = new ArrayList<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Used when close this output sink.
|
||||||
|
*/
|
||||||
|
protected final ThreadPoolExecutor closeThreadPool;
|
||||||
|
protected final CompletionService<Void> closeCompletionService;
|
||||||
|
|
||||||
public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
|
public OutputSink(WALSplitter.PipelineController controller, EntryBuffers entryBuffers,
|
||||||
int numWriters) {
|
int numWriters) {
|
||||||
numThreads = numWriters;
|
this.numThreads = numWriters;
|
||||||
this.controller = controller;
|
this.controller = controller;
|
||||||
this.entryBuffers = entryBuffers;
|
this.entryBuffers = entryBuffers;
|
||||||
|
this.closeThreadPool = Threads.getBoundedCachedThreadPool(numThreads, 30L, TimeUnit.SECONDS,
|
||||||
|
Threads.newDaemonThreadFactory("split-log-closeStream-"));
|
||||||
|
this.closeCompletionService = new ExecutorCompletionService<>(closeThreadPool);
|
||||||
}
|
}
|
||||||
|
|
||||||
void setReporter(CancelableProgressable reporter) {
|
void setReporter(CancelableProgressable reporter) {
|
||||||
|
@ -83,7 +81,7 @@ public abstract class OutputSink {
|
||||||
/**
|
/**
|
||||||
* Start the threads that will pump data from the entryBuffers to the output files.
|
* Start the threads that will pump data from the entryBuffers to the output files.
|
||||||
*/
|
*/
|
||||||
public synchronized void startWriterThreads() {
|
synchronized void startWriterThreads() {
|
||||||
for (int i = 0; i < numThreads; i++) {
|
for (int i = 0; i < numThreads; i++) {
|
||||||
WriterThread t = new WriterThread(controller, entryBuffers, this, i);
|
WriterThread t = new WriterThread(controller, entryBuffers, this, i);
|
||||||
t.start();
|
t.start();
|
||||||
|
@ -91,49 +89,21 @@ public abstract class OutputSink {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Update region's maximum edit log SeqNum.
|
|
||||||
*/
|
|
||||||
void updateRegionMaximumEditLogSeqNum(WAL.Entry entry) {
|
|
||||||
synchronized (regionMaximumEditLogSeqNum) {
|
|
||||||
String regionName = Bytes.toString(entry.getKey().getEncodedRegionName());
|
|
||||||
Long currentMaxSeqNum = regionMaximumEditLogSeqNum.get(regionName);
|
|
||||||
if (currentMaxSeqNum == null || entry.getKey().getSequenceId() > currentMaxSeqNum) {
|
|
||||||
regionMaximumEditLogSeqNum.put(regionName, entry.getKey().getSequenceId());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* @return the number of currently opened writers
|
|
||||||
*/
|
|
||||||
int getNumOpenWriters() {
|
|
||||||
return this.writers.size();
|
|
||||||
}
|
|
||||||
|
|
||||||
long getSkippedEdits() {
|
|
||||||
return this.skippedEdits.get();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Wait for writer threads to dump all info to the sink
|
* Wait for writer threads to dump all info to the sink
|
||||||
|
*
|
||||||
* @return true when there is no error
|
* @return true when there is no error
|
||||||
*/
|
*/
|
||||||
protected boolean finishWriting(boolean interrupt) throws IOException {
|
boolean finishWriterThreads() throws IOException {
|
||||||
LOG.debug("Waiting for split writer threads to finish");
|
LOG.debug("Waiting for split writer threads to finish");
|
||||||
boolean progress_failed = false;
|
boolean progressFailed = false;
|
||||||
for (WriterThread t : writerThreads) {
|
for (WriterThread t : writerThreads) {
|
||||||
t.finish();
|
t.finish();
|
||||||
}
|
}
|
||||||
if (interrupt) {
|
|
||||||
for (WriterThread t : writerThreads) {
|
|
||||||
t.interrupt(); // interrupt the writer threads. We are stopping now.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
for (WriterThread t : writerThreads) {
|
for (WriterThread t : writerThreads) {
|
||||||
if (!progress_failed && reporter != null && !reporter.progress()) {
|
if (!progressFailed && reporter != null && !reporter.progress()) {
|
||||||
progress_failed = true;
|
progressFailed = true;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
t.join();
|
t.join();
|
||||||
|
@ -144,41 +114,42 @@ public abstract class OutputSink {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
controller.checkForErrors();
|
controller.checkForErrors();
|
||||||
LOG.info("{} split writers finished; closing.", this.writerThreads.size());
|
LOG.info("{} split writer threads finished", this.writerThreads.size());
|
||||||
return (!progress_failed);
|
return (!progressFailed);
|
||||||
}
|
}
|
||||||
|
|
||||||
public abstract List<Path> finishWritingAndClose() throws IOException;
|
long getTotalSkippedEdits() {
|
||||||
|
return this.totalSkippedEdits.get();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return the number of currently opened writers
|
||||||
|
*/
|
||||||
|
abstract int getNumOpenWriters();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @param buffer A buffer of some number of edits for a given region.
|
||||||
|
*/
|
||||||
|
abstract void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException;
|
||||||
|
|
||||||
|
abstract List<Path> close() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return a map from encoded region ID to the number of edits written out for that region.
|
* @return a map from encoded region ID to the number of edits written out for that region.
|
||||||
*/
|
*/
|
||||||
public abstract Map<byte[], Long> getOutputCounts();
|
abstract Map<byte[], Long> getOutputCounts();
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return number of regions we've recovered
|
* @return number of regions we've recovered
|
||||||
*/
|
*/
|
||||||
public abstract int getNumberOfRecoveredRegions();
|
abstract int getNumberOfRecoveredRegions();
|
||||||
|
|
||||||
/**
|
|
||||||
* @param buffer A WAL Edit Entry
|
|
||||||
*/
|
|
||||||
public abstract void append(WALSplitter.RegionEntryBuffer buffer) throws IOException;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* WriterThread call this function to help flush internal remaining edits in buffer before close
|
|
||||||
* @return true when underlying sink has something to flush
|
|
||||||
*/
|
|
||||||
public boolean flush() throws IOException {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will
|
* Some WALEdit's contain only KV's for account on what happened to a region. Not all sinks will
|
||||||
* want to get all of those edits.
|
* want to get all of those edits.
|
||||||
* @return Return true if this sink wants to accept this region-level WALEdit.
|
* @return Return true if this sink wants to accept this region-level WALEdit.
|
||||||
*/
|
*/
|
||||||
public abstract boolean keepRegionEvent(WAL.Entry entry);
|
abstract boolean keepRegionEvent(WAL.Entry entry);
|
||||||
|
|
||||||
public static class WriterThread extends Thread {
|
public static class WriterThread extends Thread {
|
||||||
private volatile boolean shouldStop = false;
|
private volatile boolean shouldStop = false;
|
||||||
|
@ -207,11 +178,11 @@ public abstract class OutputSink {
|
||||||
private void doRun() throws IOException {
|
private void doRun() throws IOException {
|
||||||
LOG.trace("Writer thread starting");
|
LOG.trace("Writer thread starting");
|
||||||
while (true) {
|
while (true) {
|
||||||
WALSplitter.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
|
EntryBuffers.RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
|
||||||
if (buffer == null) {
|
if (buffer == null) {
|
||||||
// No data currently available, wait on some more to show up
|
// No data currently available, wait on some more to show up
|
||||||
synchronized (controller.dataAvailable) {
|
synchronized (controller.dataAvailable) {
|
||||||
if (shouldStop && !this.outputSink.flush()) {
|
if (shouldStop) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
|
@ -234,15 +205,11 @@ public abstract class OutputSink {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void writeBuffer(WALSplitter.RegionEntryBuffer buffer) throws IOException {
|
private void writeBuffer(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
|
||||||
outputSink.append(buffer);
|
outputSink.append(buffer);
|
||||||
}
|
}
|
||||||
|
|
||||||
void setShouldStop(boolean shouldStop) {
|
private void finish() {
|
||||||
this.shouldStop = shouldStop;
|
|
||||||
}
|
|
||||||
|
|
||||||
void finish() {
|
|
||||||
synchronized (controller.dataAvailable) {
|
synchronized (controller.dataAvailable) {
|
||||||
shouldStop = true;
|
shouldStop = true;
|
||||||
controller.dataAvailable.notifyAll();
|
controller.dataAvailable.notifyAll();
|
||||||
|
|
|
@ -0,0 +1,155 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hbase.wal;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.io.InterruptedIOException;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.TreeMap;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
|
import org.apache.hadoop.fs.Path;
|
||||||
|
import org.apache.hadoop.hbase.TableName;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.io.MultipleIOException;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Class that manages the output streams from the log splitting process.
|
||||||
|
* Every region only has one recovered edits.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
class RecoveredEditsOutputSink extends AbstractRecoveredEditsOutputSink {
|
||||||
|
private static final Logger LOG = LoggerFactory.getLogger(RecoveredEditsOutputSink.class);
|
||||||
|
private ConcurrentMap<String, RecoveredEditsWriter> writers = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
public RecoveredEditsOutputSink(WALSplitter walSplitter,
|
||||||
|
WALSplitter.PipelineController controller, EntryBuffers entryBuffers, int numWriters) {
|
||||||
|
super(walSplitter, controller, entryBuffers, numWriters);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void append(EntryBuffers.RegionEntryBuffer buffer) throws IOException {
|
||||||
|
List<WAL.Entry> entries = buffer.entryBuffer;
|
||||||
|
if (entries.isEmpty()) {
|
||||||
|
LOG.warn("got an empty buffer, skipping");
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
RecoveredEditsWriter writer =
|
||||||
|
getRecoveredEditsWriter(buffer.tableName, buffer.encodedRegionName,
|
||||||
|
entries.get(0).getKey().getSequenceId());
|
||||||
|
if (writer != null) {
|
||||||
|
writer.writeRegionEntries(entries);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get a writer and path for a log starting at the given entry. This function is threadsafe so
|
||||||
|
* long as multiple threads are always acting on different regions.
|
||||||
|
* @return null if this region shouldn't output any logs
|
||||||
|
*/
|
||||||
|
private RecoveredEditsWriter getRecoveredEditsWriter(TableName tableName, byte[] region,
|
||||||
|
long seqId) throws IOException {
|
||||||
|
RecoveredEditsWriter ret = writers.get(Bytes.toString(region));
|
||||||
|
if (ret != null) {
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
ret = createRecoveredEditsWriter(tableName, region, seqId);
|
||||||
|
if (ret == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
writers.put(Bytes.toString(region), ret);
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public List<Path> close() throws IOException {
|
||||||
|
boolean isSuccessful = true;
|
||||||
|
try {
|
||||||
|
isSuccessful &= finishWriterThreads();
|
||||||
|
} finally {
|
||||||
|
isSuccessful &= closeWriters();
|
||||||
|
}
|
||||||
|
return isSuccessful ? splits : null;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Close all of the output streams.
|
||||||
|
*
|
||||||
|
* @return true when there is no error.
|
||||||
|
*/
|
||||||
|
private boolean closeWriters() throws IOException {
|
||||||
|
List<IOException> thrown = Lists.newArrayList();
|
||||||
|
for (RecoveredEditsWriter writer : writers.values()) {
|
||||||
|
closeCompletionService.submit(() -> {
|
||||||
|
Path dst = closeRecoveredEditsWriter(writer, thrown);
|
||||||
|
splits.add(dst);
|
||||||
|
return null;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
boolean progressFailed = false;
|
||||||
|
try {
|
||||||
|
for (int i = 0, n = this.writers.size(); i < n; i++) {
|
||||||
|
Future<Void> future = closeCompletionService.take();
|
||||||
|
future.get();
|
||||||
|
if (!progressFailed && reporter != null && !reporter.progress()) {
|
||||||
|
progressFailed = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
IOException iie = new InterruptedIOException();
|
||||||
|
iie.initCause(e);
|
||||||
|
throw iie;
|
||||||
|
} catch (ExecutionException e) {
|
||||||
|
throw new IOException(e.getCause());
|
||||||
|
} finally {
|
||||||
|
closeThreadPool.shutdownNow();
|
||||||
|
}
|
||||||
|
if (!thrown.isEmpty()) {
|
||||||
|
throw MultipleIOException.createIOException(thrown);
|
||||||
|
}
|
||||||
|
return !progressFailed;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Map<byte[], Long> getOutputCounts() {
|
||||||
|
TreeMap<byte[], Long> ret = new TreeMap<>(Bytes.BYTES_COMPARATOR);
|
||||||
|
for (Map.Entry<String, RecoveredEditsWriter> entry : writers.entrySet()) {
|
||||||
|
ret.put(Bytes.toBytes(entry.getKey()), entry.getValue().editsWritten);
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int getNumberOfRecoveredRegions() {
|
||||||
|
return writers.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
int getNumOpenWriters() {
|
||||||
|
return writers.size();
|
||||||
|
}
|
||||||
|
}
|
|
@ -163,7 +163,9 @@ public final class WALSplitUtil {
|
||||||
* named for the sequenceid in the passed <code>logEntry</code>: e.g.
|
* named for the sequenceid in the passed <code>logEntry</code>: e.g.
|
||||||
* /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of
|
* /hbase/some_table/2323432434/recovered.edits/2332. This method also ensures existence of
|
||||||
* RECOVERED_EDITS_DIR under the region creating it if necessary.
|
* RECOVERED_EDITS_DIR under the region creating it if necessary.
|
||||||
* @param walEntry walEntry to recover
|
* @param tableName the table name
|
||||||
|
* @param encodedRegionName the encoded region name
|
||||||
|
* @param sedId the sequence id which used to generate file name
|
||||||
* @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
|
* @param fileNameBeingSplit the file being split currently. Used to generate tmp file name.
|
||||||
* @param tmpDirName of the directory used to sideline old recovered edits file
|
* @param tmpDirName of the directory used to sideline old recovered edits file
|
||||||
* @param conf configuration
|
* @param conf configuration
|
||||||
|
@ -172,12 +174,12 @@ public final class WALSplitUtil {
|
||||||
*/
|
*/
|
||||||
@SuppressWarnings("deprecation")
|
@SuppressWarnings("deprecation")
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
static Path getRegionSplitEditsPath(final WAL.Entry walEntry, String fileNameBeingSplit,
|
static Path getRegionSplitEditsPath(TableName tableName, byte[] encodedRegionName, long sedId,
|
||||||
String tmpDirName, Configuration conf) throws IOException {
|
String fileNameBeingSplit, String tmpDirName, Configuration conf) throws IOException {
|
||||||
FileSystem walFS = FSUtils.getWALFileSystem(conf);
|
FileSystem walFS = FSUtils.getWALFileSystem(conf);
|
||||||
Path tableDir = FSUtils.getWALTableDir(conf, walEntry.getKey().getTableName());
|
Path tableDir = FSUtils.getWALTableDir(conf, tableName);
|
||||||
String encodedRegionName = Bytes.toString(walEntry.getKey().getEncodedRegionName());
|
String encodedRegionNameStr = Bytes.toString(encodedRegionName);
|
||||||
Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionName);
|
Path regionDir = HRegion.getRegionDir(tableDir, encodedRegionNameStr);
|
||||||
Path dir = getRegionDirRecoveredEditsDir(regionDir);
|
Path dir = getRegionDirRecoveredEditsDir(regionDir);
|
||||||
|
|
||||||
if (walFS.exists(dir) && walFS.isFile(dir)) {
|
if (walFS.exists(dir) && walFS.isFile(dir)) {
|
||||||
|
@ -185,7 +187,7 @@ public final class WALSplitUtil {
|
||||||
if (!walFS.exists(tmp)) {
|
if (!walFS.exists(tmp)) {
|
||||||
walFS.mkdirs(tmp);
|
walFS.mkdirs(tmp);
|
||||||
}
|
}
|
||||||
tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
|
tmp = new Path(tmp, HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionNameStr);
|
||||||
LOG.warn("Found existing old file: {}. It could be some "
|
LOG.warn("Found existing old file: {}. It could be some "
|
||||||
+ "leftover of an old installation. It should be a folder instead. "
|
+ "leftover of an old installation. It should be a folder instead. "
|
||||||
+ "So moving it to {}",
|
+ "So moving it to {}",
|
||||||
|
@ -201,7 +203,7 @@ public final class WALSplitUtil {
|
||||||
// Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
|
// Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
|
||||||
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
|
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
|
||||||
// region's replayRecoveredEdits will not delete it
|
// region's replayRecoveredEdits will not delete it
|
||||||
String fileName = formatRecoveredEditsFileName(walEntry.getKey().getSequenceId());
|
String fileName = formatRecoveredEditsFileName(sedId);
|
||||||
fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
|
fileName = getTmpRecoveredEditsFileName(fileName + "-" + fileNameBeingSplit);
|
||||||
return new Path(dir, fileName);
|
return new Path(dir, fileName);
|
||||||
}
|
}
|
||||||
|
|
|
@ -38,9 +38,7 @@ import org.apache.hadoop.fs.FileSystem;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.TableName;
|
|
||||||
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
|
import org.apache.hadoop.hbase.coordination.SplitLogWorkerCoordination;
|
||||||
import org.apache.hadoop.hbase.io.HeapSize;
|
|
||||||
import org.apache.hadoop.hbase.master.SplitLogManager;
|
import org.apache.hadoop.hbase.master.SplitLogManager;
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
import org.apache.hadoop.hbase.monitoring.MonitoredTask;
|
||||||
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
import org.apache.hadoop.hbase.monitoring.TaskMonitor;
|
||||||
|
@ -49,12 +47,10 @@ import org.apache.hadoop.hbase.regionserver.LastSequenceId;
|
||||||
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
import org.apache.hadoop.hbase.regionserver.wal.WALCellCodec;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
import org.apache.hadoop.hbase.util.CancelableProgressable;
|
||||||
import org.apache.hadoop.hbase.util.ClassSize;
|
|
||||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
import org.apache.hadoop.hbase.wal.WAL.Entry;
|
||||||
import org.apache.hadoop.hbase.wal.WAL.Reader;
|
import org.apache.hadoop.hbase.wal.WAL.Reader;
|
||||||
import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
@ -107,11 +103,12 @@ public class WALSplitter {
|
||||||
// the file being split currently
|
// the file being split currently
|
||||||
private FileStatus fileBeingSplit;
|
private FileStatus fileBeingSplit;
|
||||||
|
|
||||||
// if we limit the number of writers opened for sinking recovered edits
|
private final String tmpDirName;
|
||||||
private final boolean splitWriterCreationBounded;
|
|
||||||
|
|
||||||
public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
|
public final static String SPLIT_WRITER_CREATION_BOUNDED = "hbase.split.writer.creation.bounded";
|
||||||
|
public final static String SPLIT_WAL_BUFFER_SIZE = "hbase.regionserver.hlog.splitlog.buffersize";
|
||||||
|
public final static String SPLIT_WAL_WRITER_THREADS =
|
||||||
|
"hbase.regionserver.hlog.splitlog.writer.threads";
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
|
WALSplitter(final WALFactory factory, Configuration conf, Path walDir, FileSystem walFS,
|
||||||
|
@ -127,20 +124,21 @@ public class WALSplitter {
|
||||||
|
|
||||||
this.walFactory = factory;
|
this.walFactory = factory;
|
||||||
PipelineController controller = new PipelineController();
|
PipelineController controller = new PipelineController();
|
||||||
|
this.tmpDirName =
|
||||||
|
conf.get(HConstants.TEMPORARY_FS_DIRECTORY_KEY, HConstants.DEFAULT_TEMPORARY_HDFS_DIRECTORY);
|
||||||
|
|
||||||
this.splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
|
// if we limit the number of writers opened for sinking recovered edits
|
||||||
|
boolean splitWriterCreationBounded = conf.getBoolean(SPLIT_WRITER_CREATION_BOUNDED, false);
|
||||||
entryBuffers = new EntryBuffers(controller,
|
long bufferSize = this.conf.getLong(SPLIT_WAL_BUFFER_SIZE, 128 * 1024 * 1024);
|
||||||
this.conf.getLong("hbase.regionserver.hlog.splitlog.buffersize", 128 * 1024 * 1024),
|
int numWriterThreads = this.conf.getInt(SPLIT_WAL_WRITER_THREADS, 3);
|
||||||
splitWriterCreationBounded);
|
|
||||||
|
|
||||||
int numWriterThreads = this.conf.getInt("hbase.regionserver.hlog.splitlog.writer.threads", 3);
|
|
||||||
if (splitWriterCreationBounded) {
|
if (splitWriterCreationBounded) {
|
||||||
|
entryBuffers = new BoundedEntryBuffers(controller, bufferSize);
|
||||||
outputSink =
|
outputSink =
|
||||||
new BoundedLogWriterCreationOutputSink(this, controller, entryBuffers, numWriterThreads);
|
new BoundedRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
|
||||||
} else {
|
} else {
|
||||||
|
entryBuffers = new EntryBuffers(controller, bufferSize);
|
||||||
outputSink =
|
outputSink =
|
||||||
new LogRecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
|
new RecoveredEditsOutputSink(this, controller, entryBuffers, numWriterThreads);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -152,6 +150,10 @@ public class WALSplitter {
|
||||||
return fileBeingSplit;
|
return fileBeingSplit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
String getTmpDirName() {
|
||||||
|
return this.tmpDirName;
|
||||||
|
}
|
||||||
|
|
||||||
Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() {
|
Map<String, Map<byte[], Long>> getRegionMaxSeqIdInStores() {
|
||||||
return regionMaxSeqIdInStores;
|
return regionMaxSeqIdInStores;
|
||||||
}
|
}
|
||||||
|
@ -215,7 +217,7 @@ public class WALSplitter {
|
||||||
int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
|
int interval = conf.getInt("hbase.splitlog.report.interval.loglines", 1024);
|
||||||
Path logPath = logfile.getPath();
|
Path logPath = logfile.getPath();
|
||||||
boolean outputSinkStarted = false;
|
boolean outputSinkStarted = false;
|
||||||
boolean progress_failed = false;
|
boolean progressFailed = false;
|
||||||
int editsCount = 0;
|
int editsCount = 0;
|
||||||
int editsSkipped = 0;
|
int editsSkipped = 0;
|
||||||
|
|
||||||
|
@ -230,7 +232,7 @@ public class WALSplitter {
|
||||||
logLength);
|
logLength);
|
||||||
status.setStatus("Opening log file");
|
status.setStatus("Opening log file");
|
||||||
if (reporter != null && !reporter.progress()) {
|
if (reporter != null && !reporter.progress()) {
|
||||||
progress_failed = true;
|
progressFailed = true;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
logFileReader = getReader(logfile, skipErrors, reporter);
|
logFileReader = getReader(logfile, skipErrors, reporter);
|
||||||
|
@ -288,11 +290,11 @@ public class WALSplitter {
|
||||||
if (editsCount % interval == 0
|
if (editsCount % interval == 0
|
||||||
|| moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
|
|| moreWritersFromLastCheck > numOpenedFilesBeforeReporting) {
|
||||||
numOpenedFilesLastCheck = this.getNumOpenWriters();
|
numOpenedFilesLastCheck = this.getNumOpenWriters();
|
||||||
String countsStr = (editsCount - (editsSkipped + outputSink.getSkippedEdits()))
|
String countsStr = (editsCount - (editsSkipped + outputSink.getTotalSkippedEdits()))
|
||||||
+ " edits, skipped " + editsSkipped + " edits.";
|
+ " edits, skipped " + editsSkipped + " edits.";
|
||||||
status.setStatus("Split " + countsStr);
|
status.setStatus("Split " + countsStr);
|
||||||
if (reporter != null && !reporter.progress()) {
|
if (reporter != null && !reporter.progress()) {
|
||||||
progress_failed = true;
|
progressFailed = true;
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -326,9 +328,9 @@ public class WALSplitter {
|
||||||
try {
|
try {
|
||||||
if (outputSinkStarted) {
|
if (outputSinkStarted) {
|
||||||
// Set progress_failed to true as the immediate following statement will reset its value
|
// Set progress_failed to true as the immediate following statement will reset its value
|
||||||
// when finishWritingAndClose() throws exception, progress_failed has the right value
|
// when close() throws exception, progress_failed has the right value
|
||||||
progress_failed = true;
|
progressFailed = true;
|
||||||
progress_failed = outputSink.finishWritingAndClose() == null;
|
progressFailed = outputSink.close() == null;
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
long processCost = EnvironmentEdgeManager.currentTime() - startTS;
|
long processCost = EnvironmentEdgeManager.currentTime() - startTS;
|
||||||
|
@ -337,18 +339,18 @@ public class WALSplitter {
|
||||||
outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost +
|
outputSink.getNumberOfRecoveredRegions() + " regions cost " + processCost +
|
||||||
" ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" +
|
" ms; edits skipped=" + editsSkipped + "; WAL=" + logPath + ", size=" +
|
||||||
StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() +
|
StringUtils.humanSize(logfile.getLen()) + ", length=" + logfile.getLen() +
|
||||||
", corrupted=" + isCorrupted + ", progress failed=" + progress_failed;
|
", corrupted=" + isCorrupted + ", progress failed=" + progressFailed;
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
status.markComplete(msg);
|
status.markComplete(msg);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return !progress_failed;
|
return !progressFailed;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Create a new {@link Reader} for reading logs to split.
|
* Create a new {@link Reader} for reading logs to split.
|
||||||
*/
|
*/
|
||||||
protected Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
|
private Reader getReader(FileStatus file, boolean skipErrors, CancelableProgressable reporter)
|
||||||
throws IOException, CorruptedLogFileException {
|
throws IOException, CorruptedLogFileException {
|
||||||
Path path = file.getPath();
|
Path path = file.getPath();
|
||||||
long length = file.getLen();
|
long length = file.getLen();
|
||||||
|
@ -392,7 +394,7 @@ public class WALSplitter {
|
||||||
return in;
|
return in;
|
||||||
}
|
}
|
||||||
|
|
||||||
static private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
|
private Entry getNextLogLine(Reader in, Path path, boolean skipErrors)
|
||||||
throws CorruptedLogFileException, IOException {
|
throws CorruptedLogFileException, IOException {
|
||||||
try {
|
try {
|
||||||
return in.next();
|
return in.next();
|
||||||
|
@ -475,98 +477,6 @@ public class WALSplitter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* A buffer of some number of edits for a given region.
|
|
||||||
* This accumulates edits and also provides a memory optimization in order to
|
|
||||||
* share a single byte array instance for the table and region name.
|
|
||||||
* Also tracks memory usage of the accumulated edits.
|
|
||||||
*/
|
|
||||||
public static class RegionEntryBuffer implements HeapSize {
|
|
||||||
long heapInBuffer = 0;
|
|
||||||
List<Entry> entryBuffer;
|
|
||||||
TableName tableName;
|
|
||||||
byte[] encodedRegionName;
|
|
||||||
|
|
||||||
RegionEntryBuffer(TableName tableName, byte[] region) {
|
|
||||||
this.tableName = tableName;
|
|
||||||
this.encodedRegionName = region;
|
|
||||||
this.entryBuffer = new ArrayList<>();
|
|
||||||
}
|
|
||||||
|
|
||||||
long appendEntry(Entry entry) {
|
|
||||||
internify(entry);
|
|
||||||
entryBuffer.add(entry);
|
|
||||||
long incrHeap = entry.getEdit().heapSize() +
|
|
||||||
ClassSize.align(2 * ClassSize.REFERENCE) + // WALKey pointers
|
|
||||||
0; // TODO linkedlist entry
|
|
||||||
heapInBuffer += incrHeap;
|
|
||||||
return incrHeap;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void internify(Entry entry) {
|
|
||||||
WALKeyImpl k = entry.getKey();
|
|
||||||
k.internTableName(this.tableName);
|
|
||||||
k.internEncodedRegionName(this.encodedRegionName);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public long heapSize() {
|
|
||||||
return heapInBuffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public byte[] getEncodedRegionName() {
|
|
||||||
return encodedRegionName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<Entry> getEntryBuffer() {
|
|
||||||
return entryBuffer;
|
|
||||||
}
|
|
||||||
|
|
||||||
public TableName getTableName() {
|
|
||||||
return tableName;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Class wraps the actual writer which writes data out and related statistics
|
|
||||||
*/
|
|
||||||
public abstract static class SinkWriter {
|
|
||||||
/* Count of edits written to this path */
|
|
||||||
long editsWritten = 0;
|
|
||||||
/* Count of edits skipped to this path */
|
|
||||||
long editsSkipped = 0;
|
|
||||||
/* Number of nanos spent writing to this log */
|
|
||||||
long nanosSpent = 0;
|
|
||||||
|
|
||||||
void incrementEdits(int edits) {
|
|
||||||
editsWritten += edits;
|
|
||||||
}
|
|
||||||
|
|
||||||
void incrementSkippedEdits(int skipped) {
|
|
||||||
editsSkipped += skipped;
|
|
||||||
}
|
|
||||||
|
|
||||||
void incrementNanoTime(long nanos) {
|
|
||||||
nanosSpent += nanos;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Private data structure that wraps a Writer and its Path, also collecting statistics about the
|
|
||||||
* data written to this output.
|
|
||||||
*/
|
|
||||||
final static class WriterAndPath extends SinkWriter {
|
|
||||||
final Path path;
|
|
||||||
final Writer writer;
|
|
||||||
final long minLogSeqNum;
|
|
||||||
|
|
||||||
WriterAndPath(final Path path, final Writer writer, final long minLogSeqNum) {
|
|
||||||
this.path = path;
|
|
||||||
this.writer = writer;
|
|
||||||
this.minLogSeqNum = minLogSeqNum;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static class CorruptedLogFileException extends Exception {
|
static class CorruptedLogFileException extends Exception {
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
@ -583,6 +493,5 @@ public class WALSplitter {
|
||||||
CorruptedLogFileException(String message, Throwable cause) {
|
CorruptedLogFileException(String message, Throwable cause) {
|
||||||
super(message, cause);
|
super(message, cause);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -40,7 +40,6 @@ import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
import org.apache.hadoop.hbase.util.FSUtils;
|
import org.apache.hadoop.hbase.util.FSUtils;
|
||||||
import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
|
import org.apache.hadoop.hbase.wal.WALSplitter.PipelineController;
|
||||||
import org.apache.hadoop.hbase.wal.WALSplitter.RegionEntryBuffer;
|
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
@ -134,7 +133,7 @@ public class TestWALMethods {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRegionEntryBuffer() throws Exception {
|
public void testRegionEntryBuffer() throws Exception {
|
||||||
WALSplitter.RegionEntryBuffer reb = new WALSplitter.RegionEntryBuffer(
|
EntryBuffers.RegionEntryBuffer reb = new EntryBuffers.RegionEntryBuffer(
|
||||||
TEST_TABLE, TEST_REGION);
|
TEST_TABLE, TEST_REGION);
|
||||||
assertEquals(0, reb.heapSize());
|
assertEquals(0, reb.heapSize());
|
||||||
|
|
||||||
|
@ -153,7 +152,7 @@ public class TestWALMethods {
|
||||||
assertTrue(sink.totalBuffered > 0);
|
assertTrue(sink.totalBuffered > 0);
|
||||||
long amountInChunk = sink.totalBuffered;
|
long amountInChunk = sink.totalBuffered;
|
||||||
// Get a chunk
|
// Get a chunk
|
||||||
RegionEntryBuffer chunk = sink.getChunkToWrite();
|
EntryBuffers.RegionEntryBuffer chunk = sink.getChunkToWrite();
|
||||||
assertEquals(chunk.heapSize(), amountInChunk);
|
assertEquals(chunk.heapSize(), amountInChunk);
|
||||||
|
|
||||||
// Make sure it got marked that a thread is "working on this"
|
// Make sure it got marked that a thread is "working on this"
|
||||||
|
@ -172,7 +171,7 @@ public class TestWALMethods {
|
||||||
// to get the second
|
// to get the second
|
||||||
sink.doneWriting(chunk);
|
sink.doneWriting(chunk);
|
||||||
|
|
||||||
RegionEntryBuffer chunk2 = sink.getChunkToWrite();
|
EntryBuffers.RegionEntryBuffer chunk2 = sink.getChunkToWrite();
|
||||||
assertNotNull(chunk2);
|
assertNotNull(chunk2);
|
||||||
assertNotSame(chunk, chunk2);
|
assertNotSame(chunk, chunk2);
|
||||||
long amountInChunk2 = sink.totalBuffered;
|
long amountInChunk2 = sink.totalBuffered;
|
||||||
|
|
|
@ -407,15 +407,15 @@ public class TestWALSplit {
|
||||||
WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
|
WALFactory.createRecoveredEditsWriter(fs, p, conf).close();
|
||||||
}
|
}
|
||||||
|
|
||||||
private Path createRecoveredEditsPathForRegion() throws IOException{
|
private Path createRecoveredEditsPathForRegion() throws IOException {
|
||||||
byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
|
byte[] encoded = RegionInfoBuilder.FIRST_META_REGIONINFO.getEncodedNameAsBytes();
|
||||||
long now = System.currentTimeMillis();
|
long now = System.currentTimeMillis();
|
||||||
Entry entry =
|
Entry entry = new Entry(
|
||||||
new Entry(new WALKeyImpl(encoded,
|
new WALKeyImpl(encoded, TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
|
||||||
TableName.META_TABLE_NAME, 1, now, HConstants.DEFAULT_CLUSTER_ID),
|
|
||||||
new WALEdit());
|
new WALEdit());
|
||||||
Path p = WALSplitUtil.getRegionSplitEditsPath(entry,
|
Path p = WALSplitUtil
|
||||||
FILENAME_BEING_SPLIT, TMPDIRNAME, conf);
|
.getRegionSplitEditsPath(TableName.META_TABLE_NAME, encoded, 1, FILENAME_BEING_SPLIT,
|
||||||
|
TMPDIRNAME, conf);
|
||||||
return p;
|
return p;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue