HBASE-19613 Miscellaneous changes to WALSplitter.
* Use ArrayList instead LinkedList * Use Apache Commons where appropriate * Parameterize and improve logging
This commit is contained in:
parent
6e136f26bf
commit
301062566a
|
@ -24,9 +24,9 @@ import java.io.IOException;
|
||||||
import java.io.InterruptedIOException;
|
import java.io.InterruptedIOException;
|
||||||
import java.text.ParseException;
|
import java.text.ParseException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.LinkedList;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.NavigableSet;
|
import java.util.NavigableSet;
|
||||||
|
@ -48,6 +48,9 @@ import java.util.concurrent.atomic.AtomicReference;
|
||||||
import java.util.regex.Matcher;
|
import java.util.regex.Matcher;
|
||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
|
import org.apache.commons.collections.CollectionUtils;
|
||||||
|
import org.apache.commons.collections.MapUtils;
|
||||||
|
import org.apache.commons.lang3.ArrayUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
import org.apache.hadoop.fs.FileAlreadyExistsException;
|
||||||
import org.apache.hadoop.fs.FileStatus;
|
import org.apache.hadoop.fs.FileStatus;
|
||||||
|
@ -86,14 +89,14 @@ import org.apache.hadoop.hbase.wal.WALProvider.Writer;
|
||||||
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
import org.apache.hadoop.hbase.zookeeper.ZKSplitLog;
|
||||||
import org.apache.hadoop.io.MultipleIOException;
|
import org.apache.hadoop.io.MultipleIOException;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
import org.apache.hbase.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
import org.apache.hbase.thirdparty.com.google.common.base.Preconditions;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
import org.apache.hbase.thirdparty.com.google.protobuf.TextFormat;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WALEntry;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.MutationProto.MutationType;
|
||||||
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
|
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionStoreSequenceIds;
|
||||||
|
@ -203,7 +206,7 @@ public class WALSplitter {
|
||||||
final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
|
final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
|
||||||
Collections.singletonList(logDir), null);
|
Collections.singletonList(logDir), null);
|
||||||
List<Path> splits = new ArrayList<>();
|
List<Path> splits = new ArrayList<>();
|
||||||
if (logfiles != null && logfiles.length > 0) {
|
if (ArrayUtils.isNotEmpty(logfiles)) {
|
||||||
for (FileStatus logfile: logfiles) {
|
for (FileStatus logfile: logfiles) {
|
||||||
WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null);
|
WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null);
|
||||||
if (s.splitLogFile(logfile, null)) {
|
if (s.splitLogFile(logfile, null)) {
|
||||||
|
@ -245,7 +248,7 @@ public class WALSplitter {
|
||||||
this.fileBeingSplit = logfile;
|
this.fileBeingSplit = logfile;
|
||||||
try {
|
try {
|
||||||
long logLength = logfile.getLen();
|
long logLength = logfile.getLen();
|
||||||
LOG.info("Splitting WAL=" + logPath + ", length=" + logLength);
|
LOG.info("Splitting WAL={}, length={}", logPath, logLength);
|
||||||
status.setStatus("Opening log file");
|
status.setStatus("Opening log file");
|
||||||
if (reporter != null && !reporter.progress()) {
|
if (reporter != null && !reporter.progress()) {
|
||||||
progress_failed = true;
|
progress_failed = true;
|
||||||
|
@ -253,7 +256,7 @@ public class WALSplitter {
|
||||||
}
|
}
|
||||||
logFileReader = getReader(logfile, skipErrors, reporter);
|
logFileReader = getReader(logfile, skipErrors, reporter);
|
||||||
if (logFileReader == null) {
|
if (logFileReader == null) {
|
||||||
LOG.warn("Nothing to split in WAL=" + logPath);
|
LOG.warn("Nothing to split in WAL={}", logPath);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
|
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
|
||||||
|
@ -317,7 +320,7 @@ public class WALSplitter {
|
||||||
iie.initCause(ie);
|
iie.initCause(ie);
|
||||||
throw iie;
|
throw iie;
|
||||||
} catch (CorruptedLogFileException e) {
|
} catch (CorruptedLogFileException e) {
|
||||||
LOG.warn("Could not parse, corrupted WAL=" + logPath, e);
|
LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
|
||||||
if (splitLogWorkerCoordination != null) {
|
if (splitLogWorkerCoordination != null) {
|
||||||
// Some tests pass in a csm of null.
|
// Some tests pass in a csm of null.
|
||||||
splitLogWorkerCoordination.markCorrupted(rootDir, logfile.getPath().getName(), fs);
|
splitLogWorkerCoordination.markCorrupted(rootDir, logfile.getPath().getName(), fs);
|
||||||
|
@ -330,14 +333,13 @@ public class WALSplitter {
|
||||||
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
|
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
|
||||||
throw e;
|
throw e;
|
||||||
} finally {
|
} finally {
|
||||||
LOG.debug("Finishing writing output logs and closing down.");
|
LOG.debug("Finishing writing output logs and closing down");
|
||||||
try {
|
try {
|
||||||
if (null != logFileReader) {
|
if (null != logFileReader) {
|
||||||
logFileReader.close();
|
logFileReader.close();
|
||||||
}
|
}
|
||||||
} catch (IOException exception) {
|
} catch (IOException exception) {
|
||||||
LOG.warn("Could not close WAL reader: " + exception.getMessage());
|
LOG.warn("Could not close WAL reader", exception);
|
||||||
LOG.debug("exception details", exception);
|
|
||||||
}
|
}
|
||||||
try {
|
try {
|
||||||
if (outputSinkStarted) {
|
if (outputSinkStarted) {
|
||||||
|
@ -417,11 +419,11 @@ public class WALSplitter {
|
||||||
final FileSystem fs, final Configuration conf) throws IOException {
|
final FileSystem fs, final Configuration conf) throws IOException {
|
||||||
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
|
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
|
||||||
if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
|
if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
|
||||||
LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to "
|
LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
|
||||||
+ corruptDir);
|
corruptDir);
|
||||||
}
|
}
|
||||||
if (!fs.mkdirs(corruptDir)) {
|
if (!fs.mkdirs(corruptDir)) {
|
||||||
LOG.info("Unable to mkdir " + corruptDir);
|
LOG.info("Unable to mkdir {}", corruptDir);
|
||||||
}
|
}
|
||||||
fs.mkdirs(oldLogDir);
|
fs.mkdirs(oldLogDir);
|
||||||
|
|
||||||
|
@ -431,9 +433,9 @@ public class WALSplitter {
|
||||||
Path p = new Path(corruptDir, corrupted.getName());
|
Path p = new Path(corruptDir, corrupted.getName());
|
||||||
if (fs.exists(corrupted)) {
|
if (fs.exists(corrupted)) {
|
||||||
if (!fs.rename(corrupted, p)) {
|
if (!fs.rename(corrupted, p)) {
|
||||||
LOG.warn("Unable to move corrupted log " + corrupted + " to " + p);
|
LOG.warn("Unable to move corrupted log {} to {}", corrupted, p);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Moved corrupted log " + corrupted + " to " + p);
|
LOG.warn("Moved corrupted log {} to {}", corrupted, p);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -442,9 +444,9 @@ public class WALSplitter {
|
||||||
Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p);
|
Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p);
|
||||||
if (fs.exists(p)) {
|
if (fs.exists(p)) {
|
||||||
if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
|
if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
|
||||||
LOG.warn("Unable to move " + p + " to " + newPath);
|
LOG.warn("Unable to move {} to {}", p, newPath);
|
||||||
} else {
|
} else {
|
||||||
LOG.info("Archived processed log " + p + " to " + newPath);
|
LOG.info("Archived processed log {} to {}", p, newPath);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -474,9 +476,9 @@ public class WALSplitter {
|
||||||
Path dir = getRegionDirRecoveredEditsDir(regiondir);
|
Path dir = getRegionDirRecoveredEditsDir(regiondir);
|
||||||
|
|
||||||
if (!fs.exists(regiondir)) {
|
if (!fs.exists(regiondir)) {
|
||||||
LOG.info("This region's directory doesn't exist: "
|
LOG.info("This region's directory does not exist: {}."
|
||||||
+ regiondir.toString() + ". It is very likely that it was" +
|
+ "It is very likely that it was already split so it is "
|
||||||
" already split so it's safe to discard those edits.");
|
+ "safe to discard those edits.", regiondir);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (fs.exists(dir) && fs.isFile(dir)) {
|
if (fs.exists(dir) && fs.isFile(dir)) {
|
||||||
|
@ -486,16 +488,16 @@ public class WALSplitter {
|
||||||
}
|
}
|
||||||
tmp = new Path(tmp,
|
tmp = new Path(tmp,
|
||||||
HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
|
HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
|
||||||
LOG.warn("Found existing old file: " + dir + ". It could be some "
|
LOG.warn("Found existing old file: {}. It could be some "
|
||||||
+ "leftover of an old installation. It should be a folder instead. "
|
+ "leftover of an old installation. It should be a folder instead. "
|
||||||
+ "So moving it to " + tmp);
|
+ "So moving it to {}", dir, tmp);
|
||||||
if (!fs.rename(dir, tmp)) {
|
if (!fs.rename(dir, tmp)) {
|
||||||
LOG.warn("Failed to sideline old file " + dir);
|
LOG.warn("Failed to sideline old file {}", dir);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!fs.exists(dir) && !fs.mkdirs(dir)) {
|
if (!fs.exists(dir) && !fs.mkdirs(dir)) {
|
||||||
LOG.warn("mkdir failed on " + dir);
|
LOG.warn("mkdir failed on {}", dir);
|
||||||
}
|
}
|
||||||
// Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
|
// Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
|
||||||
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
|
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
|
||||||
|
@ -554,8 +556,9 @@ public class WALSplitter {
|
||||||
final Path regiondir) throws IOException {
|
final Path regiondir) throws IOException {
|
||||||
NavigableSet<Path> filesSorted = new TreeSet<>();
|
NavigableSet<Path> filesSorted = new TreeSet<>();
|
||||||
Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
|
Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
|
||||||
if (!fs.exists(editsdir))
|
if (!fs.exists(editsdir)) {
|
||||||
return filesSorted;
|
return filesSorted;
|
||||||
|
}
|
||||||
FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
|
FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
|
||||||
@Override
|
@Override
|
||||||
public boolean accept(Path p) {
|
public boolean accept(Path p) {
|
||||||
|
@ -577,16 +580,13 @@ public class WALSplitter {
|
||||||
result = false;
|
result = false;
|
||||||
}
|
}
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
LOG.warn("Failed isFile check on " + p);
|
LOG.warn("Failed isFile check on {}", p, e);
|
||||||
}
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
if (files == null) {
|
if (ArrayUtils.isNotEmpty(files)) {
|
||||||
return filesSorted;
|
Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
|
||||||
}
|
|
||||||
for (FileStatus status : files) {
|
|
||||||
filesSorted.add(status.getPath());
|
|
||||||
}
|
}
|
||||||
return filesSorted;
|
return filesSorted;
|
||||||
}
|
}
|
||||||
|
@ -605,7 +605,7 @@ public class WALSplitter {
|
||||||
Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
|
Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
|
||||||
+ System.currentTimeMillis());
|
+ System.currentTimeMillis());
|
||||||
if (!fs.rename(edits, moveAsideName)) {
|
if (!fs.rename(edits, moveAsideName)) {
|
||||||
LOG.warn("Rename failed from " + edits + " to " + moveAsideName);
|
LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
|
||||||
}
|
}
|
||||||
return moveAsideName;
|
return moveAsideName;
|
||||||
}
|
}
|
||||||
|
@ -655,7 +655,7 @@ public class WALSplitter {
|
||||||
- SEQUENCE_ID_FILE_SUFFIX_LENGTH));
|
- SEQUENCE_ID_FILE_SUFFIX_LENGTH));
|
||||||
maxSeqId = Math.max(tmpSeqId, maxSeqId);
|
maxSeqId = Math.max(tmpSeqId, maxSeqId);
|
||||||
} catch (NumberFormatException ex) {
|
} catch (NumberFormatException ex) {
|
||||||
LOG.warn("Invalid SeqId File Name=" + fileName);
|
LOG.warn("Invalid SeqId File Name={}", fileName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -672,10 +672,8 @@ public class WALSplitter {
|
||||||
if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
|
if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
|
||||||
throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
|
throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
|
||||||
}
|
}
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Wrote file={}, newSeqId={}, maxSeqId={}", newSeqIdFile,
|
||||||
LOG.debug("Wrote file=" + newSeqIdFile + ", newSeqId=" + newSeqId
|
newSeqId, maxSeqId);
|
||||||
+ ", maxSeqId=" + maxSeqId);
|
|
||||||
}
|
|
||||||
} catch (FileAlreadyExistsException ignored) {
|
} catch (FileAlreadyExistsException ignored) {
|
||||||
// latest hdfs throws this exception. it's all right if newSeqIdFile already exists
|
// latest hdfs throws this exception. it's all right if newSeqIdFile already exists
|
||||||
}
|
}
|
||||||
|
@ -683,12 +681,11 @@ public class WALSplitter {
|
||||||
// remove old ones
|
// remove old ones
|
||||||
if (files != null) {
|
if (files != null) {
|
||||||
for (FileStatus status : files) {
|
for (FileStatus status : files) {
|
||||||
if (newSeqIdFile.equals(status.getPath())) {
|
if (!newSeqIdFile.equals(status.getPath())) {
|
||||||
continue;
|
|
||||||
}
|
|
||||||
fs.delete(status.getPath(), false);
|
fs.delete(status.getPath(), false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
return newSeqId;
|
return newSeqId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -710,7 +707,7 @@ public class WALSplitter {
|
||||||
// zero length even if the file has been sync'd. Revisit if HDFS-376 or
|
// zero length even if the file has been sync'd. Revisit if HDFS-376 or
|
||||||
// HDFS-878 is committed.
|
// HDFS-878 is committed.
|
||||||
if (length <= 0) {
|
if (length <= 0) {
|
||||||
LOG.warn("File " + path + " might be still open, length is 0");
|
LOG.warn("File {} might be still open, length is 0", path);
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
@ -724,17 +721,15 @@ public class WALSplitter {
|
||||||
// ignore if this is the last log in sequence.
|
// ignore if this is the last log in sequence.
|
||||||
// TODO is this scenario still possible if the log has been
|
// TODO is this scenario still possible if the log has been
|
||||||
// recovered (i.e. closed)
|
// recovered (i.e. closed)
|
||||||
LOG.warn("Could not open " + path + " for reading. File is empty", e);
|
LOG.warn("Could not open {} for reading. File is empty", path, e);
|
||||||
return null;
|
}
|
||||||
} else {
|
|
||||||
// EOFException being ignored
|
// EOFException being ignored
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
if (e instanceof FileNotFoundException) {
|
if (e instanceof FileNotFoundException) {
|
||||||
// A wal file may not exist anymore. Nothing can be recovered so move on
|
// A wal file may not exist anymore. Nothing can be recovered so move on
|
||||||
LOG.warn("File " + path + " doesn't exist anymore.", e);
|
LOG.warn("File {} does not exist anymore", path, e);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (!skipErrors || e instanceof InterruptedIOException) {
|
if (!skipErrors || e instanceof InterruptedIOException) {
|
||||||
|
@ -755,7 +750,7 @@ public class WALSplitter {
|
||||||
return in.next();
|
return in.next();
|
||||||
} catch (EOFException eof) {
|
} catch (EOFException eof) {
|
||||||
// truncated files are expected if a RS crashes (see HBASE-2643)
|
// truncated files are expected if a RS crashes (see HBASE-2643)
|
||||||
LOG.info("EOF from wal " + path + ". continuing");
|
LOG.info("EOF from wal {}. Continuing.", path);
|
||||||
return null;
|
return null;
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
// If the IOE resulted from bad file format,
|
// If the IOE resulted from bad file format,
|
||||||
|
@ -763,8 +758,7 @@ public class WALSplitter {
|
||||||
if (e.getCause() != null &&
|
if (e.getCause() != null &&
|
||||||
(e.getCause() instanceof ParseException ||
|
(e.getCause() instanceof ParseException ||
|
||||||
e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
|
e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
|
||||||
LOG.warn("Parse exception " + e.getCause().toString() + " from wal "
|
LOG.warn("Parse exception from wal {}. Continuing", path, e);
|
||||||
+ path + ". continuing");
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
if (!skipErrors) {
|
if (!skipErrors) {
|
||||||
|
@ -893,8 +887,7 @@ public class WALSplitter {
|
||||||
synchronized (controller.dataAvailable) {
|
synchronized (controller.dataAvailable) {
|
||||||
totalBuffered += incrHeap;
|
totalBuffered += incrHeap;
|
||||||
while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
|
while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
|
||||||
LOG.debug("Used " + totalBuffered +
|
LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
|
||||||
" bytes of buffered edits, waiting for IO threads...");
|
|
||||||
controller.dataAvailable.wait(2000);
|
controller.dataAvailable.wait(2000);
|
||||||
}
|
}
|
||||||
controller.dataAvailable.notifyAll();
|
controller.dataAvailable.notifyAll();
|
||||||
|
@ -980,7 +973,7 @@ public class WALSplitter {
|
||||||
RegionEntryBuffer(TableName tableName, byte[] region) {
|
RegionEntryBuffer(TableName tableName, byte[] region) {
|
||||||
this.tableName = tableName;
|
this.tableName = tableName;
|
||||||
this.encodedRegionName = region;
|
this.encodedRegionName = region;
|
||||||
this.entryBuffer = new LinkedList<>();
|
this.entryBuffer = new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
long appendEntry(Entry entry) {
|
long appendEntry(Entry entry) {
|
||||||
|
@ -1041,7 +1034,7 @@ public class WALSplitter {
|
||||||
}
|
}
|
||||||
|
|
||||||
private void doRun() throws IOException {
|
private void doRun() throws IOException {
|
||||||
if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting");
|
LOG.trace("Writer thread starting");
|
||||||
while (true) {
|
while (true) {
|
||||||
RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
|
RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
|
||||||
if (buffer == null) {
|
if (buffer == null) {
|
||||||
|
@ -1190,7 +1183,7 @@ public class WALSplitter {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
controller.checkForErrors();
|
controller.checkForErrors();
|
||||||
LOG.info(this.writerThreads.size() + " split writers finished; closing...");
|
LOG.info("{} split writers finished; closing.", this.writerThreads.size());
|
||||||
return (!progress_failed);
|
return (!progress_failed);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1257,7 +1250,7 @@ public class WALSplitter {
|
||||||
} finally {
|
} finally {
|
||||||
result = close();
|
result = close();
|
||||||
List<IOException> thrown = closeLogWriters(null);
|
List<IOException> thrown = closeLogWriters(null);
|
||||||
if (thrown != null && !thrown.isEmpty()) {
|
if (CollectionUtils.isNotEmpty(thrown)) {
|
||||||
throw MultipleIOException.createIOException(thrown);
|
throw MultipleIOException.createIOException(thrown);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1276,25 +1269,22 @@ public class WALSplitter {
|
||||||
dstMinLogSeqNum = entry.getKey().getSequenceId();
|
dstMinLogSeqNum = entry.getKey().getSequenceId();
|
||||||
}
|
}
|
||||||
} catch (EOFException e) {
|
} catch (EOFException e) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?",
|
||||||
LOG.debug(
|
dst, e);
|
||||||
"Got EOF when reading first WAL entry from " + dst + ", an empty or broken WAL file?",
|
|
||||||
e);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
if (wap.minLogSeqNum < dstMinLogSeqNum) {
|
if (wap.minLogSeqNum < dstMinLogSeqNum) {
|
||||||
LOG.warn("Found existing old edits file. It could be the result of a previous failed"
|
LOG.warn("Found existing old edits file. It could be the result of a previous failed"
|
||||||
+ " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
|
+ " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
|
||||||
+ fs.getFileStatus(dst).getLen());
|
+ fs.getFileStatus(dst).getLen());
|
||||||
if (!fs.delete(dst, false)) {
|
if (!fs.delete(dst, false)) {
|
||||||
LOG.warn("Failed deleting of old " + dst);
|
LOG.warn("Failed deleting of old {}", dst);
|
||||||
throw new IOException("Failed deleting of old " + dst);
|
throw new IOException("Failed deleting of old " + dst);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
|
LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
|
||||||
+ ", length=" + fs.getFileStatus(wap.p).getLen());
|
+ ", length=" + fs.getFileStatus(wap.p).getLen());
|
||||||
if (!fs.delete(wap.p, false)) {
|
if (!fs.delete(wap.p, false)) {
|
||||||
LOG.warn("Failed deleting of " + wap.p);
|
LOG.warn("Failed deleting of {}", wap.p);
|
||||||
throw new IOException("Failed deleting of " + wap.p);
|
throw new IOException("Failed deleting of " + wap.p);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1377,13 +1367,11 @@ public class WALSplitter {
|
||||||
|
|
||||||
Path closeWriter(String encodedRegionName, WriterAndPath wap,
|
Path closeWriter(String encodedRegionName, WriterAndPath wap,
|
||||||
List<IOException> thrown) throws IOException{
|
List<IOException> thrown) throws IOException{
|
||||||
if (LOG.isTraceEnabled()) {
|
LOG.trace("Closing {}", wap.p);
|
||||||
LOG.trace("Closing " + wap.p);
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
wap.w.close();
|
wap.w.close();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.error("Couldn't close log at " + wap.p, ioe);
|
LOG.error("Could not close log at {}", wap.p, ioe);
|
||||||
thrown.add(ioe);
|
thrown.add(ioe);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -1395,7 +1383,7 @@ public class WALSplitter {
|
||||||
if (wap.editsWritten == 0) {
|
if (wap.editsWritten == 0) {
|
||||||
// just remove the empty recovered.edits file
|
// just remove the empty recovered.edits file
|
||||||
if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
|
if (fs.exists(wap.p) && !fs.delete(wap.p, false)) {
|
||||||
LOG.warn("Failed deleting empty " + wap.p);
|
LOG.warn("Failed deleting empty {}", wap.p);
|
||||||
throw new IOException("Failed deleting empty " + wap.p);
|
throw new IOException("Failed deleting empty " + wap.p);
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
|
@ -1414,10 +1402,10 @@ public class WALSplitter {
|
||||||
if (!fs.rename(wap.p, dst)) {
|
if (!fs.rename(wap.p, dst)) {
|
||||||
throw new IOException("Failed renaming " + wap.p + " to " + dst);
|
throw new IOException("Failed renaming " + wap.p + " to " + dst);
|
||||||
}
|
}
|
||||||
LOG.info("Rename " + wap.p + " to " + dst);
|
LOG.info("Rename {} to {}", wap.p, dst);
|
||||||
}
|
}
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.error("Couldn't rename " + wap.p + " to " + dst, ioe);
|
LOG.error("Could not rename {} to {}", wap.p, dst, ioe);
|
||||||
thrown.add(ioe);
|
thrown.add(ioe);
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
@ -1428,7 +1416,6 @@ public class WALSplitter {
|
||||||
if (writersClosed) {
|
if (writersClosed) {
|
||||||
return thrown;
|
return thrown;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (thrown == null) {
|
if (thrown == null) {
|
||||||
thrown = Lists.newArrayList();
|
thrown = Lists.newArrayList();
|
||||||
}
|
}
|
||||||
|
@ -1453,7 +1440,7 @@ public class WALSplitter {
|
||||||
wap = (WriterAndPath) tmpWAP;
|
wap = (WriterAndPath) tmpWAP;
|
||||||
wap.w.close();
|
wap.w.close();
|
||||||
} catch (IOException ioe) {
|
} catch (IOException ioe) {
|
||||||
LOG.error("Couldn't close log at " + wap.p, ioe);
|
LOG.error("Couldn't close log at {}", wap.p, ioe);
|
||||||
thrown.add(ioe);
|
thrown.add(ioe);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
|
@ -1508,18 +1495,18 @@ public class WALSplitter {
|
||||||
+ "result of a previous failed split attempt. Deleting " + regionedits + ", length="
|
+ "result of a previous failed split attempt. Deleting " + regionedits + ", length="
|
||||||
+ fs.getFileStatus(regionedits).getLen());
|
+ fs.getFileStatus(regionedits).getLen());
|
||||||
if (!fs.delete(regionedits, false)) {
|
if (!fs.delete(regionedits, false)) {
|
||||||
LOG.warn("Failed delete of old " + regionedits);
|
LOG.warn("Failed delete of old {}", regionedits);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
Writer w = createWriter(regionedits);
|
Writer w = createWriter(regionedits);
|
||||||
LOG.debug("Creating writer path=" + regionedits);
|
LOG.debug("Creating writer path={}", regionedits);
|
||||||
return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
|
return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
|
||||||
}
|
}
|
||||||
|
|
||||||
void filterCellByStore(Entry logEntry) {
|
void filterCellByStore(Entry logEntry) {
|
||||||
Map<byte[], Long> maxSeqIdInStores =
|
Map<byte[], Long> maxSeqIdInStores =
|
||||||
regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
|
regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
|
||||||
if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) {
|
if (MapUtils.isEmpty(maxSeqIdInStores)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
// Create the array list for the cells that aren't filtered.
|
// Create the array list for the cells that aren't filtered.
|
||||||
|
@ -1567,11 +1554,9 @@ public class WALSplitter {
|
||||||
if (wap == null) {
|
if (wap == null) {
|
||||||
wap = getWriterAndPath(logEntry, reusable);
|
wap = getWriterAndPath(logEntry, reusable);
|
||||||
if (wap == null) {
|
if (wap == null) {
|
||||||
if (LOG.isTraceEnabled()) {
|
|
||||||
// This log spews the full edit. Can be massive in the log. Enable only debugging
|
// This log spews the full edit. Can be massive in the log. Enable only debugging
|
||||||
// WAL lost edit issues.
|
// WAL lost edit issues.
|
||||||
LOG.trace("getWriterAndPath decided we don't need to write edits for " + logEntry);
|
LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
|
||||||
}
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1590,7 +1575,7 @@ public class WALSplitter {
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
e = e instanceof RemoteException ?
|
e = e instanceof RemoteException ?
|
||||||
((RemoteException)e).unwrapRemoteException() : e;
|
((RemoteException)e).unwrapRemoteException() : e;
|
||||||
LOG.error(HBaseMarkers.FATAL, " Got while writing log entry to log", e);
|
LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
|
||||||
throw e;
|
throw e;
|
||||||
}
|
}
|
||||||
return wap;
|
return wap;
|
||||||
|
@ -1599,8 +1584,8 @@ public class WALSplitter {
|
||||||
@Override
|
@Override
|
||||||
public boolean keepRegionEvent(Entry entry) {
|
public boolean keepRegionEvent(Entry entry) {
|
||||||
ArrayList<Cell> cells = entry.getEdit().getCells();
|
ArrayList<Cell> cells = entry.getEdit().getCells();
|
||||||
for (int i = 0; i < cells.size(); i++) {
|
for (Cell cell : cells) {
|
||||||
if (WALEdit.isCompactionMarker(cells.get(i))) {
|
if (WALEdit.isCompactionMarker(cell)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1657,7 +1642,7 @@ public class WALSplitter {
|
||||||
List<IOException> thrown, List<Path> paths)
|
List<IOException> thrown, List<Path> paths)
|
||||||
throws InterruptedException, ExecutionException {
|
throws InterruptedException, ExecutionException {
|
||||||
for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) {
|
for (final Map.Entry<byte[], RegionEntryBuffer> buffer : entryBuffers.buffers.entrySet()) {
|
||||||
LOG.info("Submitting writeThenClose of " + buffer.getValue().encodedRegionName);
|
LOG.info("Submitting writeThenClose of {}", buffer.getValue().encodedRegionName);
|
||||||
completionService.submit(new Callable<Void>() {
|
completionService.submit(new Callable<Void>() {
|
||||||
public Void call() throws Exception {
|
public Void call() throws Exception {
|
||||||
Path dst = writeThenClose(buffer.getValue());
|
Path dst = writeThenClose(buffer.getValue());
|
||||||
|
@ -1835,7 +1820,7 @@ public class WALSplitter {
|
||||||
|
|
||||||
if (entry == null) {
|
if (entry == null) {
|
||||||
// return an empty array
|
// return an empty array
|
||||||
return new ArrayList<>();
|
return Collections.emptyList();
|
||||||
}
|
}
|
||||||
|
|
||||||
long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
|
long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
|
||||||
|
@ -1846,7 +1831,9 @@ public class WALSplitter {
|
||||||
Mutation m = null;
|
Mutation m = null;
|
||||||
WALKeyImpl key = null;
|
WALKeyImpl key = null;
|
||||||
WALEdit val = null;
|
WALEdit val = null;
|
||||||
if (logEntry != null) val = new WALEdit();
|
if (logEntry != null) {
|
||||||
|
val = new WALEdit();
|
||||||
|
}
|
||||||
|
|
||||||
for (int i = 0; i < count; i++) {
|
for (int i = 0; i < count; i++) {
|
||||||
// Throw index out of bounds if our cell count is off
|
// Throw index out of bounds if our cell count is off
|
||||||
|
|
Loading…
Reference in New Issue