HBASE-19613 Miscellaneous changes to WALSplitter.

* Use ArrayList instead LinkedList
* Use Apache Commons where appropriate
* Parameterize and improve logging
This commit is contained in:
BELUGA BEHR 2018-01-03 18:29:09 -08:00 committed by Apekshit Sharma
parent 16e8422855
commit 59558f020f
1 changed files with 63 additions and 73 deletions

View File

@ -29,8 +29,8 @@ import java.io.IOException;
import java.io.InterruptedIOException; import java.io.InterruptedIOException;
import java.text.ParseException; import java.text.ParseException;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.NavigableSet; import java.util.NavigableSet;
@ -52,6 +52,9 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Matcher; import java.util.regex.Matcher;
import java.util.regex.Pattern; import java.util.regex.Pattern;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileAlreadyExistsException; import org.apache.hadoop.fs.FileAlreadyExistsException;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
@ -98,7 +101,6 @@ import org.apache.hadoop.ipc.RemoteException;
import org.apache.yetus.audience.InterfaceAudience; import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
/** /**
* This class is responsible for splitting up a bunch of regionserver commit log * This class is responsible for splitting up a bunch of regionserver commit log
* files that are no longer being written to, into new files, one per region, for * files that are no longer being written to, into new files, one per region, for
@ -188,7 +190,7 @@ public class WALSplitter {
final FileStatus[] logfiles = SplitLogManager.getFileList(conf, final FileStatus[] logfiles = SplitLogManager.getFileList(conf,
Collections.singletonList(logDir), null); Collections.singletonList(logDir), null);
List<Path> splits = new ArrayList<>(); List<Path> splits = new ArrayList<>();
if (logfiles != null && logfiles.length > 0) { if (ArrayUtils.isNotEmpty(logfiles)) {
for (FileStatus logfile: logfiles) { for (FileStatus logfile: logfiles) {
WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null); WALSplitter s = new WALSplitter(factory, conf, rootDir, fs, null, null);
if (s.splitLogFile(logfile, null)) { if (s.splitLogFile(logfile, null)) {
@ -230,7 +232,7 @@ public class WALSplitter {
this.fileBeingSplit = logfile; this.fileBeingSplit = logfile;
try { try {
long logLength = logfile.getLen(); long logLength = logfile.getLen();
LOG.info("Splitting WAL=" + logPath + ", length=" + logLength); LOG.info("Splitting WAL={}, length={}", logPath, logLength);
status.setStatus("Opening log file"); status.setStatus("Opening log file");
if (reporter != null && !reporter.progress()) { if (reporter != null && !reporter.progress()) {
progress_failed = true; progress_failed = true;
@ -238,7 +240,7 @@ public class WALSplitter {
} }
logFileReader = getReader(logfile, skipErrors, reporter); logFileReader = getReader(logfile, skipErrors, reporter);
if (logFileReader == null) { if (logFileReader == null) {
LOG.warn("Nothing to split in WAL=" + logPath); LOG.warn("Nothing to split in WAL={}", logPath);
return true; return true;
} }
int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3); int numOpenedFilesBeforeReporting = conf.getInt("hbase.splitlog.report.openedfiles", 3);
@ -302,7 +304,7 @@ public class WALSplitter {
iie.initCause(ie); iie.initCause(ie);
throw iie; throw iie;
} catch (CorruptedLogFileException e) { } catch (CorruptedLogFileException e) {
LOG.warn("Could not parse, corrupted WAL=" + logPath, e); LOG.warn("Could not parse, corrupted WAL={}", logPath, e);
if (splitLogWorkerCoordination != null) { if (splitLogWorkerCoordination != null) {
// Some tests pass in a csm of null. // Some tests pass in a csm of null.
splitLogWorkerCoordination.markCorrupted(rootDir, logfile.getPath().getName(), fs); splitLogWorkerCoordination.markCorrupted(rootDir, logfile.getPath().getName(), fs);
@ -315,14 +317,13 @@ public class WALSplitter {
e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e; e = e instanceof RemoteException ? ((RemoteException) e).unwrapRemoteException() : e;
throw e; throw e;
} finally { } finally {
LOG.debug("Finishing writing output logs and closing down."); LOG.debug("Finishing writing output logs and closing down");
try { try {
if (null != logFileReader) { if (null != logFileReader) {
logFileReader.close(); logFileReader.close();
} }
} catch (IOException exception) { } catch (IOException exception) {
LOG.warn("Could not close WAL reader: " + exception.getMessage()); LOG.warn("Could not close WAL reader", exception);
LOG.debug("exception details", exception);
} }
try { try {
if (outputSinkStarted) { if (outputSinkStarted) {
@ -402,11 +403,11 @@ public class WALSplitter {
final FileSystem fs, final Configuration conf) throws IOException { final FileSystem fs, final Configuration conf) throws IOException {
final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME); final Path corruptDir = new Path(FSUtils.getWALRootDir(conf), HConstants.CORRUPT_DIR_NAME);
if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) { if (conf.get("hbase.regionserver.hlog.splitlog.corrupt.dir") != null) {
LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to " LOG.warn("hbase.regionserver.hlog.splitlog.corrupt.dir is deprecated. Default to {}",
+ corruptDir); corruptDir);
} }
if (!fs.mkdirs(corruptDir)) { if (!fs.mkdirs(corruptDir)) {
LOG.info("Unable to mkdir " + corruptDir); LOG.info("Unable to mkdir {}", corruptDir);
} }
fs.mkdirs(oldLogDir); fs.mkdirs(oldLogDir);
@ -416,9 +417,9 @@ public class WALSplitter {
Path p = new Path(corruptDir, corrupted.getName()); Path p = new Path(corruptDir, corrupted.getName());
if (fs.exists(corrupted)) { if (fs.exists(corrupted)) {
if (!fs.rename(corrupted, p)) { if (!fs.rename(corrupted, p)) {
LOG.warn("Unable to move corrupted log " + corrupted + " to " + p); LOG.warn("Unable to move corrupted log {} to {}", corrupted, p);
} else { } else {
LOG.warn("Moved corrupted log " + corrupted + " to " + p); LOG.warn("Moved corrupted log {} to {}", corrupted, p);
} }
} }
} }
@ -427,9 +428,9 @@ public class WALSplitter {
Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p); Path newPath = AbstractFSWAL.getWALArchivePath(oldLogDir, p);
if (fs.exists(p)) { if (fs.exists(p)) {
if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) { if (!FSUtils.renameAndSetModifyTime(fs, p, newPath)) {
LOG.warn("Unable to move " + p + " to " + newPath); LOG.warn("Unable to move {} to {}", p, newPath);
} else { } else {
LOG.info("Archived processed log " + p + " to " + newPath); LOG.info("Archived processed log {} to {}", p, newPath);
} }
} }
} }
@ -459,9 +460,9 @@ public class WALSplitter {
Path dir = getRegionDirRecoveredEditsDir(regiondir); Path dir = getRegionDirRecoveredEditsDir(regiondir);
if (!fs.exists(regiondir)) { if (!fs.exists(regiondir)) {
LOG.info("This region's directory doesn't exist: " LOG.info("This region's directory does not exist: {}."
+ regiondir.toString() + ". It is very likely that it was" + + "It is very likely that it was already split so it is "
" already split so it's safe to discard those edits."); + "safe to discard those edits.", regiondir);
return null; return null;
} }
if (fs.exists(dir) && fs.isFile(dir)) { if (fs.exists(dir) && fs.isFile(dir)) {
@ -471,16 +472,16 @@ public class WALSplitter {
} }
tmp = new Path(tmp, tmp = new Path(tmp,
HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName); HConstants.RECOVERED_EDITS_DIR + "_" + encodedRegionName);
LOG.warn("Found existing old file: " + dir + ". It could be some " LOG.warn("Found existing old file: {}. It could be some "
+ "leftover of an old installation. It should be a folder instead. " + "leftover of an old installation. It should be a folder instead. "
+ "So moving it to " + tmp); + "So moving it to {}", dir, tmp);
if (!fs.rename(dir, tmp)) { if (!fs.rename(dir, tmp)) {
LOG.warn("Failed to sideline old file " + dir); LOG.warn("Failed to sideline old file {}", dir);
} }
} }
if (!fs.exists(dir) && !fs.mkdirs(dir)) { if (!fs.exists(dir) && !fs.mkdirs(dir)) {
LOG.warn("mkdir failed on " + dir); LOG.warn("mkdir failed on {}", dir);
} }
// Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now. // Append fileBeingSplit to prevent name conflict since we may have duplicate wal entries now.
// Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure // Append file name ends with RECOVERED_LOG_TMPFILE_SUFFIX to ensure
@ -539,8 +540,9 @@ public class WALSplitter {
final Path regiondir) throws IOException { final Path regiondir) throws IOException {
NavigableSet<Path> filesSorted = new TreeSet<>(); NavigableSet<Path> filesSorted = new TreeSet<>();
Path editsdir = getRegionDirRecoveredEditsDir(regiondir); Path editsdir = getRegionDirRecoveredEditsDir(regiondir);
if (!fs.exists(editsdir)) if (!fs.exists(editsdir)) {
return filesSorted; return filesSorted;
}
FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() { FileStatus[] files = FSUtils.listStatus(fs, editsdir, new PathFilter() {
@Override @Override
public boolean accept(Path p) { public boolean accept(Path p) {
@ -562,16 +564,13 @@ public class WALSplitter {
result = false; result = false;
} }
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Failed isFile check on " + p); LOG.warn("Failed isFile check on {}", p, e);
} }
return result; return result;
} }
}); });
if (files == null) { if (ArrayUtils.isNotEmpty(files)) {
return filesSorted; Arrays.asList(files).forEach(status -> filesSorted.add(status.getPath()));
}
for (FileStatus status : files) {
filesSorted.add(status.getPath());
} }
return filesSorted; return filesSorted;
} }
@ -590,7 +589,7 @@ public class WALSplitter {
Path moveAsideName = new Path(edits.getParent(), edits.getName() + "." Path moveAsideName = new Path(edits.getParent(), edits.getName() + "."
+ System.currentTimeMillis()); + System.currentTimeMillis());
if (!fs.rename(edits, moveAsideName)) { if (!fs.rename(edits, moveAsideName)) {
LOG.warn("Rename failed from " + edits + " to " + moveAsideName); LOG.warn("Rename failed from {} to {}", edits, moveAsideName);
} }
return moveAsideName; return moveAsideName;
} }
@ -640,7 +639,7 @@ public class WALSplitter {
- SEQUENCE_ID_FILE_SUFFIX_LENGTH)); - SEQUENCE_ID_FILE_SUFFIX_LENGTH));
maxSeqId = Math.max(tmpSeqId, maxSeqId); maxSeqId = Math.max(tmpSeqId, maxSeqId);
} catch (NumberFormatException ex) { } catch (NumberFormatException ex) {
LOG.warn("Invalid SeqId File Name=" + fileName); LOG.warn("Invalid SeqId File Name={}", fileName);
} }
} }
} }
@ -657,10 +656,8 @@ public class WALSplitter {
if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) { if (!fs.createNewFile(newSeqIdFile) && !fs.exists(newSeqIdFile)) {
throw new IOException("Failed to create SeqId file:" + newSeqIdFile); throw new IOException("Failed to create SeqId file:" + newSeqIdFile);
} }
if (LOG.isDebugEnabled()) { LOG.debug("Wrote file={}, newSeqId={}, maxSeqId={}", newSeqIdFile,
LOG.debug("Wrote file=" + newSeqIdFile + ", newSeqId=" + newSeqId newSeqId, maxSeqId);
+ ", maxSeqId=" + maxSeqId);
}
} catch (FileAlreadyExistsException ignored) { } catch (FileAlreadyExistsException ignored) {
// latest hdfs throws this exception. it's all right if newSeqIdFile already exists // latest hdfs throws this exception. it's all right if newSeqIdFile already exists
} }
@ -668,10 +665,9 @@ public class WALSplitter {
// remove old ones // remove old ones
if (files != null) { if (files != null) {
for (FileStatus status : files) { for (FileStatus status : files) {
if (newSeqIdFile.equals(status.getPath())) { if (!newSeqIdFile.equals(status.getPath())) {
continue; fs.delete(status.getPath(), false);
} }
fs.delete(status.getPath(), false);
} }
} }
return newSeqId; return newSeqId;
@ -695,7 +691,7 @@ public class WALSplitter {
// zero length even if the file has been sync'd. Revisit if HDFS-376 or // zero length even if the file has been sync'd. Revisit if HDFS-376 or
// HDFS-878 is committed. // HDFS-878 is committed.
if (length <= 0) { if (length <= 0) {
LOG.warn("File " + path + " might be still open, length is 0"); LOG.warn("File {} might be still open, length is 0", path);
} }
try { try {
@ -709,17 +705,15 @@ public class WALSplitter {
// ignore if this is the last log in sequence. // ignore if this is the last log in sequence.
// TODO is this scenario still possible if the log has been // TODO is this scenario still possible if the log has been
// recovered (i.e. closed) // recovered (i.e. closed)
LOG.warn("Could not open " + path + " for reading. File is empty", e); LOG.warn("Could not open {} for reading. File is empty", path, e);
return null;
} else {
// EOFException being ignored
return null;
} }
// EOFException being ignored
return null;
} }
} catch (IOException e) { } catch (IOException e) {
if (e instanceof FileNotFoundException) { if (e instanceof FileNotFoundException) {
// A wal file may not exist anymore. Nothing can be recovered so move on // A wal file may not exist anymore. Nothing can be recovered so move on
LOG.warn("File " + path + " doesn't exist anymore.", e); LOG.warn("File {} does not exist anymore", path, e);
return null; return null;
} }
if (!skipErrors || e instanceof InterruptedIOException) { if (!skipErrors || e instanceof InterruptedIOException) {
@ -740,7 +734,7 @@ public class WALSplitter {
return in.next(); return in.next();
} catch (EOFException eof) { } catch (EOFException eof) {
// truncated files are expected if a RS crashes (see HBASE-2643) // truncated files are expected if a RS crashes (see HBASE-2643)
LOG.info("EOF from wal " + path + ". continuing"); LOG.info("EOF from wal {}. Continuing.", path);
return null; return null;
} catch (IOException e) { } catch (IOException e) {
// If the IOE resulted from bad file format, // If the IOE resulted from bad file format,
@ -748,8 +742,7 @@ public class WALSplitter {
if (e.getCause() != null && if (e.getCause() != null &&
(e.getCause() instanceof ParseException || (e.getCause() instanceof ParseException ||
e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) { e.getCause() instanceof org.apache.hadoop.fs.ChecksumException)) {
LOG.warn("Parse exception " + e.getCause().toString() + " from wal " LOG.warn("Parse exception from wal {}. Continuing", path, e);
+ path + ". continuing");
return null; return null;
} }
if (!skipErrors) { if (!skipErrors) {
@ -871,8 +864,7 @@ public class WALSplitter {
synchronized (controller.dataAvailable) { synchronized (controller.dataAvailable) {
totalBuffered += incrHeap; totalBuffered += incrHeap;
while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) { while (totalBuffered > maxHeapUsage && controller.thrown.get() == null) {
LOG.debug("Used " + totalBuffered + LOG.debug("Used {} bytes of buffered edits, waiting for IO threads", totalBuffered);
" bytes of buffered edits, waiting for IO threads...");
controller.dataAvailable.wait(2000); controller.dataAvailable.wait(2000);
} }
controller.dataAvailable.notifyAll(); controller.dataAvailable.notifyAll();
@ -951,7 +943,7 @@ public class WALSplitter {
RegionEntryBuffer(TableName tableName, byte[] region) { RegionEntryBuffer(TableName tableName, byte[] region) {
this.tableName = tableName; this.tableName = tableName;
this.encodedRegionName = region; this.encodedRegionName = region;
this.entryBuffer = new LinkedList<>(); this.entryBuffer = new ArrayList<>();
} }
long appendEntry(Entry entry) { long appendEntry(Entry entry) {
@ -1012,7 +1004,7 @@ public class WALSplitter {
} }
private void doRun() throws IOException { private void doRun() throws IOException {
if (LOG.isTraceEnabled()) LOG.trace("Writer thread starting"); LOG.trace("Writer thread starting");
while (true) { while (true) {
RegionEntryBuffer buffer = entryBuffers.getChunkToWrite(); RegionEntryBuffer buffer = entryBuffers.getChunkToWrite();
if (buffer == null) { if (buffer == null) {
@ -1163,7 +1155,7 @@ public class WALSplitter {
} }
} }
controller.checkForErrors(); controller.checkForErrors();
LOG.info(this.writerThreads.size() + " split writers finished; closing..."); LOG.info("{} split writers finished; closing.", this.writerThreads.size());
return (!progress_failed); return (!progress_failed);
} }
@ -1230,7 +1222,7 @@ public class WALSplitter {
} finally { } finally {
result = close(); result = close();
List<IOException> thrown = closeLogWriters(null); List<IOException> thrown = closeLogWriters(null);
if (thrown != null && !thrown.isEmpty()) { if (CollectionUtils.isNotEmpty(thrown)) {
throw MultipleIOException.createIOException(thrown); throw MultipleIOException.createIOException(thrown);
} }
} }
@ -1249,25 +1241,22 @@ public class WALSplitter {
dstMinLogSeqNum = entry.getKey().getSequenceId(); dstMinLogSeqNum = entry.getKey().getSequenceId();
} }
} catch (EOFException e) { } catch (EOFException e) {
if (LOG.isDebugEnabled()) { LOG.debug("Got EOF when reading first WAL entry from {}, an empty or broken WAL file?",
LOG.debug( dst, e);
"Got EOF when reading first WAL entry from " + dst + ", an empty or broken WAL file?",
e);
}
} }
if (wap.minLogSeqNum < dstMinLogSeqNum) { if (wap.minLogSeqNum < dstMinLogSeqNum) {
LOG.warn("Found existing old edits file. It could be the result of a previous failed" LOG.warn("Found existing old edits file. It could be the result of a previous failed"
+ " split attempt or we have duplicated wal entries. Deleting " + dst + ", length=" + " split attempt or we have duplicated wal entries. Deleting " + dst + ", length="
+ fs.getFileStatus(dst).getLen()); + fs.getFileStatus(dst).getLen());
if (!fs.delete(dst, false)) { if (!fs.delete(dst, false)) {
LOG.warn("Failed deleting of old " + dst); LOG.warn("Failed deleting of old {}", dst);
throw new IOException("Failed deleting of old " + dst); throw new IOException("Failed deleting of old " + dst);
} }
} else { } else {
LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p LOG.warn("Found existing old edits file and we have less entries. Deleting " + wap.p
+ ", length=" + fs.getFileStatus(wap.p).getLen()); + ", length=" + fs.getFileStatus(wap.p).getLen());
if (!fs.delete(wap.p, false)) { if (!fs.delete(wap.p, false)) {
LOG.warn("Failed deleting of " + wap.p); LOG.warn("Failed deleting of {}", wap.p);
throw new IOException("Failed deleting of " + wap.p); throw new IOException("Failed deleting of " + wap.p);
} }
} }
@ -1383,7 +1372,6 @@ public class WALSplitter {
if (writersClosed) { if (writersClosed) {
return thrown; return thrown;
} }
if (thrown == null) { if (thrown == null) {
thrown = Lists.newArrayList(); thrown = Lists.newArrayList();
} }
@ -1409,7 +1397,7 @@ public class WALSplitter {
wap = (WriterAndPath) tmpWAP; wap = (WriterAndPath) tmpWAP;
wap.w.close(); wap.w.close();
} catch (IOException ioe) { } catch (IOException ioe) {
LOG.error("Couldn't close log at " + wap.p, ioe); LOG.error("Couldn't close log at {}", wap.p, ioe);
thrown.add(ioe); thrown.add(ioe);
continue; continue;
} }
@ -1461,18 +1449,18 @@ public class WALSplitter {
+ "result of a previous failed split attempt. Deleting " + regionedits + ", length=" + "result of a previous failed split attempt. Deleting " + regionedits + ", length="
+ fs.getFileStatus(regionedits).getLen()); + fs.getFileStatus(regionedits).getLen());
if (!fs.delete(regionedits, false)) { if (!fs.delete(regionedits, false)) {
LOG.warn("Failed delete of old " + regionedits); LOG.warn("Failed delete of old {}", regionedits);
} }
} }
Writer w = createWriter(regionedits); Writer w = createWriter(regionedits);
LOG.debug("Creating writer path=" + regionedits); LOG.debug("Creating writer path={}", regionedits);
return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId()); return new WriterAndPath(regionedits, w, entry.getKey().getSequenceId());
} }
private void filterCellByStore(Entry logEntry) { private void filterCellByStore(Entry logEntry) {
Map<byte[], Long> maxSeqIdInStores = Map<byte[], Long> maxSeqIdInStores =
regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName())); regionMaxSeqIdInStores.get(Bytes.toString(logEntry.getKey().getEncodedRegionName()));
if (maxSeqIdInStores == null || maxSeqIdInStores.isEmpty()) { if (MapUtils.isEmpty(maxSeqIdInStores)) {
return; return;
} }
// Create the array list for the cells that aren't filtered. // Create the array list for the cells that aren't filtered.
@ -1519,7 +1507,7 @@ public class WALSplitter {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
// This log spews the full edit. Can be massive in the log. Enable only debugging // This log spews the full edit. Can be massive in the log. Enable only debugging
// WAL lost edit issues. // WAL lost edit issues.
LOG.trace("getWriterAndPath decided we don't need to write edits for " + logEntry); LOG.trace("getWriterAndPath decided we don't need to write edits for {}", logEntry);
} }
return; return;
} }
@ -1539,7 +1527,7 @@ public class WALSplitter {
} catch (IOException e) { } catch (IOException e) {
e = e instanceof RemoteException ? e = e instanceof RemoteException ?
((RemoteException)e).unwrapRemoteException() : e; ((RemoteException)e).unwrapRemoteException() : e;
LOG.error(HBaseMarkers.FATAL, " Got while writing log entry to log", e); LOG.error(HBaseMarkers.FATAL, "Got while writing log entry to log", e);
throw e; throw e;
} }
} }
@ -1547,8 +1535,8 @@ public class WALSplitter {
@Override @Override
public boolean keepRegionEvent(Entry entry) { public boolean keepRegionEvent(Entry entry) {
ArrayList<Cell> cells = entry.getEdit().getCells(); ArrayList<Cell> cells = entry.getEdit().getCells();
for (int i = 0; i < cells.size(); i++) { for (Cell cell : cells) {
if (WALEdit.isCompactionMarker(cells.get(i))) { if (WALEdit.isCompactionMarker(cell)) {
return true; return true;
} }
} }
@ -1677,7 +1665,7 @@ public class WALSplitter {
if (entry == null) { if (entry == null) {
// return an empty array // return an empty array
return new ArrayList<>(); return Collections.emptyList();
} }
long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ? long replaySeqId = (entry.getKey().hasOrigSequenceNumber()) ?
@ -1688,7 +1676,9 @@ public class WALSplitter {
Mutation m = null; Mutation m = null;
WALKeyImpl key = null; WALKeyImpl key = null;
WALEdit val = null; WALEdit val = null;
if (logEntry != null) val = new WALEdit(); if (logEntry != null) {
val = new WALEdit();
}
for (int i = 0; i < count; i++) { for (int i = 0; i < count; i++) {
// Throw index out of bounds if our cell count is off // Throw index out of bounds if our cell count is off