HBASE-1430 Read the logs in batches during log splitting to avoid OOME
git-svn-id: https://svn.apache.org/repos/asf/hadoop/hbase/trunk@776501 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
d6afa6748a
commit
b812635201
|
@ -270,6 +270,7 @@ Release 0.20.0 - Unreleased
|
|||
minutes/hours
|
||||
HBASE-1420 add abliity to add and remove (table) indexes on existing
|
||||
tables (Clint Morgan via Stack)
|
||||
HBASE-1430 Read the logs in batches during log splitting to avoid OOME
|
||||
|
||||
OPTIMIZATIONS
|
||||
HBASE-1412 Change values for delete column and column family in KeyValue
|
||||
|
|
|
@ -110,15 +110,22 @@ class ServerManager implements HConstants {
|
|||
int numServers = serverAddressToServerInfo.size();
|
||||
int numDeadServers = deadServers.size();
|
||||
double averageLoad = getAverageLoad();
|
||||
LOG.info(numServers + " region servers, " + numDeadServers +
|
||||
" dead, average load " + averageLoad);
|
||||
String deadServersList = null;
|
||||
if (numDeadServers > 0) {
|
||||
LOG.info("DEAD [");
|
||||
StringBuilder sb = new StringBuilder("Dead Server [");
|
||||
boolean first = true;
|
||||
for (String server: deadServers) {
|
||||
LOG.info(" " + server);
|
||||
if (!first) {
|
||||
sb.append(", ");
|
||||
first = false;
|
||||
}
|
||||
sb.append(server);
|
||||
}
|
||||
LOG.info("]");
|
||||
sb.append("]");
|
||||
deadServersList = sb.toString();
|
||||
}
|
||||
LOG.info(numServers + " region servers, " + numDeadServers +
|
||||
" dead, average load " + averageLoad + (deadServersList != null? deadServers: ""));
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -411,7 +418,7 @@ class ServerManager implements HConstants {
|
|||
for (int i = 0; i < incomingMsgs.length; i++) {
|
||||
HRegionInfo region = incomingMsgs[i].getRegionInfo();
|
||||
LOG.info("Received " + incomingMsgs[i] + " from " +
|
||||
serverInfo.getServerName() + "; " + i + " of " + incomingMsgs.length);
|
||||
serverInfo.getServerName() + "; " + (i + 1) + " of " + incomingMsgs.length);
|
||||
switch (incomingMsgs[i].getType()) {
|
||||
case MSG_REPORT_PROCESS_OPEN:
|
||||
openingCount++;
|
||||
|
|
|
@ -774,147 +774,155 @@ public class HLog implements HConstants, Syncable {
|
|||
throws IOException {
|
||||
final Map<byte [], SequenceFile.Writer> logWriters =
|
||||
new TreeMap<byte [], SequenceFile.Writer>(Bytes.BYTES_COMPARATOR);
|
||||
final Map<byte[], LinkedList<HLogEntry>> logEntries =
|
||||
new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
|
||||
|
||||
try {
|
||||
for (int i = 0; i < logfiles.length; i++) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
|
||||
": " + logfiles[i].getPath() + ", length=" + logfiles[i].getLen());
|
||||
}
|
||||
// Check for possibly empty file. With appends, currently Hadoop reports
|
||||
// a zero length even if the file has been sync'd. Revisit if
|
||||
// HADOOP-4751 is committed.
|
||||
long length = logfiles[i].getLen();
|
||||
HLogKey key = new HLogKey();
|
||||
KeyValue val = new KeyValue();
|
||||
SequenceFile.Reader in = null;
|
||||
try {
|
||||
in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
|
||||
try {
|
||||
int count = 0;
|
||||
while (in.next(key, val)) {
|
||||
byte [] regionName = key.getRegionName();
|
||||
LinkedList<HLogEntry> queue = logEntries.get(regionName);
|
||||
if (queue == null) {
|
||||
queue = new LinkedList<HLogEntry>();
|
||||
LOG.debug("Adding queue for " + Bytes.toString(regionName));
|
||||
logEntries.put(regionName, queue);
|
||||
}
|
||||
queue.push(new HLogEntry(val, key));
|
||||
count++;
|
||||
}
|
||||
LOG.debug("Pushed " + count + " entries from " +
|
||||
logfiles[i].getPath());
|
||||
} catch (IOException e) {
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
if (!(e instanceof EOFException)) {
|
||||
LOG.warn("Exception processing " + logfiles[i].getPath() +
|
||||
" -- continuing. Possible DATA LOSS!", e);
|
||||
}
|
||||
int maxSteps = Double.valueOf(Math.ceil((logfiles.length * 1.0) /
|
||||
DEFAULT_NUMBER_CONCURRENT_LOG_READS)).intValue();
|
||||
for(int step = 0; step < maxSteps; step++) {
|
||||
final Map<byte[], LinkedList<HLogEntry>> logEntries =
|
||||
new TreeMap<byte[], LinkedList<HLogEntry>>(Bytes.BYTES_COMPARATOR);
|
||||
// Stop at logfiles.length when it's the last step
|
||||
int endIndex = step == maxSteps - 1? logfiles.length:
|
||||
step * DEFAULT_NUMBER_CONCURRENT_LOG_READS +
|
||||
DEFAULT_NUMBER_CONCURRENT_LOG_READS;
|
||||
for (int i = (step * 10); i < endIndex; i++) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Splitting hlog " + (i + 1) + " of " + logfiles.length +
|
||||
": " + logfiles[i].getPath() +
|
||||
", length=" + logfiles[i].getLen());
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (length <= 0) {
|
||||
LOG.warn("Empty hlog, continuing: " + logfiles[i]);
|
||||
continue;
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
try {
|
||||
if (in != null) {
|
||||
in.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Close in finally threw exception -- continuing", e);
|
||||
}
|
||||
// Delete the input file now so we do not replay edits. We could
|
||||
// have gotten here because of an exception. If so, probably
|
||||
// nothing we can do about it. Replaying it, it could work but we
|
||||
// could be stuck replaying for ever. Just continue though we
|
||||
// could have lost some edits.
|
||||
fs.delete(logfiles[i].getPath(), true);
|
||||
}
|
||||
}
|
||||
ExecutorService threadPool =
|
||||
Executors.newFixedThreadPool(DEFAULT_NUMBER_LOG_WRITER_THREAD);
|
||||
for (final byte[] key : logEntries.keySet()) {
|
||||
|
||||
Thread thread = new Thread(Bytes.toString(key)) {
|
||||
public void run() {
|
||||
LinkedList<HLogEntry> entries = logEntries.get(key);
|
||||
LOG.debug("Thread got " + entries.size() + " to process");
|
||||
long threadTime = System.currentTimeMillis();
|
||||
try {
|
||||
// Check for possibly empty file. With appends, currently Hadoop
|
||||
// reports a zero length even if the file has been sync'd. Revisit if
|
||||
// HADOOP-4751 is committed.
|
||||
long length = logfiles[i].getLen();
|
||||
HLogKey key = new HLogKey();
|
||||
KeyValue val = new KeyValue();
|
||||
SequenceFile.Reader in = null;
|
||||
int count = 0;
|
||||
for (HLogEntry logEntry : entries) {
|
||||
SequenceFile.Writer w = logWriters.get(key);
|
||||
if (w == null) {
|
||||
Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
|
||||
.getTableDir(rootDir, logEntry.getKey().getTablename()),
|
||||
HRegionInfo.encodeRegionName(key)),
|
||||
HREGION_OLDLOGFILE_NAME);
|
||||
Path oldlogfile = null;
|
||||
SequenceFile.Reader old = null;
|
||||
if (fs.exists(logfile)) {
|
||||
LOG.warn("Old hlog file " + logfile
|
||||
+ " already exists. Copying existing file to new file");
|
||||
oldlogfile = new Path(logfile.toString() + ".old");
|
||||
fs.rename(logfile, oldlogfile);
|
||||
old = new SequenceFile.Reader(fs, oldlogfile, conf);
|
||||
}
|
||||
w = SequenceFile.createWriter(fs, conf, logfile,
|
||||
HLogKey.class, KeyValue.class, getCompressionType(conf));
|
||||
// Use copy of regionName; regionName object is reused inside
|
||||
// in
|
||||
// HStoreKey.getRegionName so its content changes as we
|
||||
// iterate.
|
||||
logWriters.put(key, w);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating new hlog file writer for path "
|
||||
+ logfile + " and region " + Bytes.toString(key));
|
||||
}
|
||||
|
||||
if (old != null) {
|
||||
// Copy from existing log file
|
||||
HLogKey oldkey = new HLogKey();
|
||||
KeyValue oldval = new KeyValue();
|
||||
for (; old.next(oldkey, oldval); count++) {
|
||||
if (LOG.isDebugEnabled() && count > 0
|
||||
&& count % 10000 == 0) {
|
||||
LOG.debug("Copied " + count + " edits");
|
||||
}
|
||||
w.append(oldkey, oldval);
|
||||
}
|
||||
old.close();
|
||||
fs.delete(oldlogfile, true);
|
||||
}
|
||||
try {
|
||||
in = new SequenceFile.Reader(fs, logfiles[i].getPath(), conf);
|
||||
try {
|
||||
while (in.next(key, val)) {
|
||||
byte [] regionName = key.getRegionName();
|
||||
LinkedList<HLogEntry> queue = logEntries.get(regionName);
|
||||
if (queue == null) {
|
||||
queue = new LinkedList<HLogEntry>();
|
||||
LOG.debug("Adding queue for " + Bytes.toString(regionName));
|
||||
logEntries.put(regionName, queue);
|
||||
}
|
||||
w.append(logEntry.getKey(), logEntry.getEdit());
|
||||
queue.push(new HLogEntry(val, key));
|
||||
count++;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Applied " + count + " total edits to "
|
||||
+ Bytes.toString(key) + " in "
|
||||
+ (System.currentTimeMillis() - threadTime) + "ms");
|
||||
LOG.debug("Pushed " + count + " entries from " +
|
||||
logfiles[i].getPath());
|
||||
} catch (IOException e) {
|
||||
LOG.debug("IOE Pushed " + count + " entries from " +
|
||||
logfiles[i].getPath());
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
if (!(e instanceof EOFException)) {
|
||||
LOG.warn("Exception processing " + logfiles[i].getPath() +
|
||||
" -- continuing. Possible DATA LOSS!", e);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
if (length <= 0) {
|
||||
LOG.warn("Empty hlog, continuing: " + logfiles[i]);
|
||||
continue;
|
||||
}
|
||||
throw e;
|
||||
} finally {
|
||||
try {
|
||||
if (in != null) {
|
||||
in.close();
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
LOG.warn("Got while writing region " + Bytes.toString(key)
|
||||
+ " log " + e);
|
||||
e.printStackTrace();
|
||||
LOG.warn("Close in finally threw exception -- continuing", e);
|
||||
}
|
||||
// Delete the input file now so we do not replay edits. We could
|
||||
// have gotten here because of an exception. If so, probably
|
||||
// nothing we can do about it. Replaying it, it could work but we
|
||||
// could be stuck replaying for ever. Just continue though we
|
||||
// could have lost some edits.
|
||||
fs.delete(logfiles[i].getPath(), true);
|
||||
}
|
||||
};
|
||||
threadPool.execute(thread);
|
||||
}
|
||||
threadPool.shutdown();
|
||||
// Wait for all threads to terminate
|
||||
try {
|
||||
for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS) ; i++) {
|
||||
LOG.debug("Waiting for hlog writers to terminate, iteration #" + i);
|
||||
}
|
||||
}catch(InterruptedException ex) {
|
||||
LOG.warn("Hlog writers were interrupted, possible data loss!");
|
||||
ExecutorService threadPool =
|
||||
Executors.newFixedThreadPool(DEFAULT_NUMBER_LOG_WRITER_THREAD);
|
||||
for (final byte[] key : logEntries.keySet()) {
|
||||
|
||||
Thread thread = new Thread(Bytes.toString(key)) {
|
||||
public void run() {
|
||||
LinkedList<HLogEntry> entries = logEntries.get(key);
|
||||
LOG.debug("Thread got " + entries.size() + " to process");
|
||||
long threadTime = System.currentTimeMillis();
|
||||
try {
|
||||
int count = 0;
|
||||
for (HLogEntry logEntry : entries) {
|
||||
SequenceFile.Writer w = logWriters.get(key);
|
||||
if (w == null) {
|
||||
Path logfile = new Path(HRegion.getRegionDir(HTableDescriptor
|
||||
.getTableDir(rootDir, logEntry.getKey().getTablename()),
|
||||
HRegionInfo.encodeRegionName(key)),
|
||||
HREGION_OLDLOGFILE_NAME);
|
||||
Path oldlogfile = null;
|
||||
SequenceFile.Reader old = null;
|
||||
if (fs.exists(logfile)) {
|
||||
LOG.warn("Old hlog file " + logfile
|
||||
+ " already exists. Copying existing file to new file");
|
||||
oldlogfile = new Path(logfile.toString() + ".old");
|
||||
fs.rename(logfile, oldlogfile);
|
||||
old = new SequenceFile.Reader(fs, oldlogfile, conf);
|
||||
}
|
||||
w = SequenceFile.createWriter(fs, conf, logfile,
|
||||
HLogKey.class, KeyValue.class, getCompressionType(conf));
|
||||
logWriters.put(key, w);
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating new hlog file writer for path "
|
||||
+ logfile + " and region " + Bytes.toString(key));
|
||||
}
|
||||
|
||||
if (old != null) {
|
||||
// Copy from existing log file
|
||||
HLogKey oldkey = new HLogKey();
|
||||
KeyValue oldval = new KeyValue();
|
||||
for (; old.next(oldkey, oldval); count++) {
|
||||
if (LOG.isDebugEnabled() && count > 0
|
||||
&& count % 10000 == 0) {
|
||||
LOG.debug("Copied " + count + " edits");
|
||||
}
|
||||
w.append(oldkey, oldval);
|
||||
}
|
||||
old.close();
|
||||
fs.delete(oldlogfile, true);
|
||||
}
|
||||
}
|
||||
w.append(logEntry.getKey(), logEntry.getEdit());
|
||||
count++;
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Applied " + count + " total edits to "
|
||||
+ Bytes.toString(key) + " in "
|
||||
+ (System.currentTimeMillis() - threadTime) + "ms");
|
||||
}
|
||||
} catch (IOException e) {
|
||||
e = RemoteExceptionHandler.checkIOException(e);
|
||||
LOG.warn("Got while writing region " + Bytes.toString(key)
|
||||
+ " log " + e);
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
};
|
||||
threadPool.execute(thread);
|
||||
}
|
||||
threadPool.shutdown();
|
||||
// Wait for all threads to terminate
|
||||
try {
|
||||
for(int i = 0; !threadPool.awaitTermination(5, TimeUnit.SECONDS); i++) {
|
||||
LOG.debug("Waiting for hlog writers to terminate, iteration #" + i);
|
||||
}
|
||||
}catch(InterruptedException ex) {
|
||||
LOG.warn("Hlog writers were interrupted, possible data loss!");
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
for (SequenceFile.Writer w : logWriters.values()) {
|
||||
|
|
Loading…
Reference in New Issue