SOLR-9389: HDFS Transaction logs stay open for writes which leaks Xceivers.

This commit is contained in:
markrmiller 2016-08-26 13:39:59 -04:00
parent b3526c568c
commit aaee4c8205
5 changed files with 93 additions and 66 deletions

View File

@ -51,6 +51,8 @@ Bug Fixes
to be consistent with other places in Solr. Language names still work for backwards to be consistent with other places in Solr. Language names still work for backwards
compatibility. (Uwe Schindler, Boris Steiner) compatibility. (Uwe Schindler, Boris Steiner)
* SOLR-9389: HDFS Transaction logs stay open for writes which leaks Xceivers. (Tim Owen via Mark Miller)
* SOLR-9188: blockUnknown property makes inter-node communication impossible (noble) * SOLR-9188: blockUnknown property makes inter-node communication impossible (noble)
Optimizations Optimizations

View File

@ -64,7 +64,7 @@ public class HdfsTransactionLog extends TransactionLog {
Path tlogFile; Path tlogFile;
private long finalLogSize;
private FSDataOutputStream tlogOutStream; private FSDataOutputStream tlogOutStream;
private FileSystem fs; private FileSystem fs;
@ -144,13 +144,8 @@ public class HdfsTransactionLog extends TransactionLog {
@Override @Override
public boolean endsWithCommit() throws IOException { public boolean endsWithCommit() throws IOException {
long size; ensureFlushed();
synchronized (this) { long size = getLogSize();
fos.flush();
tlogOutStream.hflush();
size = fos.size();
}
// the end of the file should have the end message (added during a commit) plus a 4 byte size // the end of the file should have the end message (added during a commit) plus a 4 byte size
byte[] buf = new byte[ END_MESSAGE.length() ]; byte[] buf = new byte[ END_MESSAGE.length() ];
@ -159,11 +154,10 @@ public class HdfsTransactionLog extends TransactionLog {
FSDataFastInputStream dis = new FSDataFastInputStream(fs.open(tlogFile), pos); FSDataFastInputStream dis = new FSDataFastInputStream(fs.open(tlogFile), pos);
try { try {
//ChannelFastInputStream is = new ChannelFastInputStream(channel, pos); dis.read(buf);
dis.read(buf); for (int i=0; i<buf.length; i++) {
for (int i=0; i<buf.length; i++) { if (buf[i] != END_MESSAGE.charAt(i)) return false;
if (buf[i] != END_MESSAGE.charAt(i)) return false; }
}
} finally { } finally {
dis.close(); dis.close();
} }
@ -176,10 +170,8 @@ public class HdfsTransactionLog extends TransactionLog {
public void rollback(long pos) throws IOException { public void rollback(long pos) throws IOException {
synchronized (this) { synchronized (this) {
assert snapshot_size == pos; assert snapshot_size == pos;
fos.flush(); ensureFlushed();
tlogOutStream.hflush();
// TODO: how do we rollback with hdfs?? We need HDFS-3107 // TODO: how do we rollback with hdfs?? We need HDFS-3107
//raf.setLength(pos);
fos.setWritten(pos); fos.setWritten(pos);
assert fos.size() == pos; assert fos.size() == pos;
numRecords = snapshot_numRecords; numRecords = snapshot_numRecords;
@ -233,8 +225,10 @@ public class HdfsTransactionLog extends TransactionLog {
endRecord(pos); endRecord(pos);
fos.flush(); // flush since this will be the last record in a log fill ensureFlushed(); // flush since this will be the last record in a log fill
tlogOutStream.hflush();
// now the commit command is written we will never write to this log again
closeOutput();
//assert fos.size() == channel.size(); //assert fos.size() == channel.size();
@ -255,19 +249,7 @@ public class HdfsTransactionLog extends TransactionLog {
try { try {
// make sure any unflushed buffer has been flushed // make sure any unflushed buffer has been flushed
synchronized (this) { ensureFlushed();
// TODO: optimize this by keeping track of what we have flushed up to
fos.flushBuffer();
// flush to hdfs
tlogOutStream.hflush();
/***
System.out.println("###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
if (fos.size() != raf.length() || pos >= fos.size() ) {
throw new RuntimeException("ERROR" + "###flushBuffer to " + fos.size() + " raf.length()=" + raf.length() + " pos="+pos);
}
***/
}
FSDataFastInputStream dis = new FSDataFastInputStream(fs.open(tlogFile), FSDataFastInputStream dis = new FSDataFastInputStream(fs.open(tlogFile),
pos); pos);
@ -283,6 +265,52 @@ public class HdfsTransactionLog extends TransactionLog {
} }
} }
@Override
public void closeOutput() {
try {
doCloseOutput();
} catch (IOException e) {
log.error("Could not close tlog output", e);
// This situation is not fatal to the caller
}
}
private void doCloseOutput() throws IOException {
synchronized (this) {
if (fos == null) return;
if (debug) {
log.debug("Closing output for " + tlogFile);
}
fos.flushBuffer();
finalLogSize = fos.size();
fos = null;
}
tlogOutStream.hflush();
tlogOutStream.close();
tlogOutStream = null;
}
private void ensureFlushed() throws IOException {
synchronized (this) {
if (fos != null) {
fos.flush();
tlogOutStream.hflush();
}
}
}
@Override
public long getLogSize() {
synchronized (this) {
if (fos != null) {
return fos.size();
} else {
return finalLogSize;
}
}
}
@Override @Override
public void finish(UpdateLog.SyncLevel syncLevel) { public void finish(UpdateLog.SyncLevel syncLevel) {
if (syncLevel == UpdateLog.SyncLevel.NONE) return; if (syncLevel == UpdateLog.SyncLevel.NONE) return;
@ -309,12 +337,7 @@ public class HdfsTransactionLog extends TransactionLog {
log.debug("Closing tlog" + this); log.debug("Closing tlog" + this);
} }
synchronized (this) { doCloseOutput();
fos.flushBuffer();
}
tlogOutStream.hflush();
tlogOutStream.close();
} catch (IOException e) { } catch (IOException e) {
log.error("Exception closing tlog.", e); log.error("Exception closing tlog.", e);
@ -359,17 +382,19 @@ public class HdfsTransactionLog extends TransactionLog {
public HDFSLogReader(long startingPos) { public HDFSLogReader(long startingPos) {
super(); super();
incref(); incref();
initStream(startingPos);
}
private void initStream(long pos) {
try { try {
synchronized (HdfsTransactionLog.this) { synchronized (HdfsTransactionLog.this) {
fos.flushBuffer(); ensureFlushed();
sz = fos.size(); sz = getLogSize();
} }
tlogOutStream.hflush();
FSDataInputStream fdis = fs.open(tlogFile); FSDataInputStream fdis = fs.open(tlogFile);
fis = new FSDataFastInputStream(fdis, startingPos); fis = new FSDataFastInputStream(fdis, pos);
} catch (IOException e) { } catch (IOException e) {
throw new RuntimeException(e); throw new RuntimeException(e);
} }
@ -385,10 +410,10 @@ public class HdfsTransactionLog extends TransactionLog {
synchronized (HdfsTransactionLog.this) { synchronized (HdfsTransactionLog.this) {
if (trace) { if (trace) {
log.trace("Reading log record. pos="+pos+" currentSize="+fos.size()); log.trace("Reading log record. pos="+pos+" currentSize="+getLogSize());
} }
if (pos >= fos.size()) { if (pos >= getLogSize()) {
return null; return null;
} }
} }
@ -398,16 +423,8 @@ public class HdfsTransactionLog extends TransactionLog {
if (pos >= sz) { if (pos >= sz) {
log.info("Read available inputstream data, opening new inputstream pos={} sz={}", pos, sz); log.info("Read available inputstream data, opening new inputstream pos={} sz={}", pos, sz);
synchronized (HdfsTransactionLog.this) {
fos.flushBuffer();
sz = fos.size();
}
tlogOutStream.hflush();
fis.close(); fis.close();
initStream(pos);
FSDataInputStream fdis = fs.open(tlogFile);
fis = new FSDataFastInputStream(fdis, pos);
} }
if (pos == 0) { if (pos == 0) {
@ -415,7 +432,7 @@ public class HdfsTransactionLog extends TransactionLog {
// shouldn't currently happen - header and first record are currently written at the same time // shouldn't currently happen - header and first record are currently written at the same time
synchronized (HdfsTransactionLog.this) { synchronized (HdfsTransactionLog.this) {
if (fis.position() >= fos.size()) { if (fis.position() >= getLogSize()) {
return null; return null;
} }
pos = fis.position(); pos = fis.position();
@ -443,7 +460,7 @@ public class HdfsTransactionLog extends TransactionLog {
@Override @Override
public String toString() { public String toString() {
synchronized (HdfsTransactionLog.this) { synchronized (HdfsTransactionLog.this) {
return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}"; return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + getLogSize() + "}";
} }
} }
@ -454,7 +471,7 @@ public class HdfsTransactionLog extends TransactionLog {
@Override @Override
public long currentSize() { public long currentSize() {
return fos.size(); return getLogSize();
} }
} }
@ -478,12 +495,8 @@ public class HdfsTransactionLog extends TransactionLog {
long sz; long sz;
synchronized (HdfsTransactionLog.this) { synchronized (HdfsTransactionLog.this) {
fos.flushBuffer(); ensureFlushed();
sz = getLogSize();
// this must be an hflush
tlogOutStream.hflush();
sz = fos.size();
//assert sz == channel.size();
} }
fis = new FSDataFastInputStream(fs.open(tlogFile), 0); fis = new FSDataFastInputStream(fs.open(tlogFile), 0);
@ -554,7 +567,7 @@ public class HdfsTransactionLog extends TransactionLog {
@Override @Override
public String toString() { public String toString() {
synchronized (HdfsTransactionLog.this) { synchronized (HdfsTransactionLog.this) {
return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}"; return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + getLogSize() + "}";
} }
} }

View File

@ -219,8 +219,13 @@ public class HdfsUpdateLog extends UpdateLog {
// It's possible that at abnormal close both "tlog" and "prevTlog" were // It's possible that at abnormal close both "tlog" and "prevTlog" were
// uncapped. // uncapped.
for (TransactionLog ll : logs) { for (TransactionLog ll : logs) {
newestLogsOnStartup.addFirst(ll); if (newestLogsOnStartup.size() < 2) {
if (newestLogsOnStartup.size() >= 2) break; newestLogsOnStartup.addFirst(ll);
} else {
// We're never going to modify old non-recovery logs - no need to hold their output open
log.info("Closing output for old non-recovery log " + ll);
ll.closeOutput();
}
} }
try { try {

View File

@ -520,6 +520,11 @@ public class TransactionLog implements Closeable {
} }
} }
/** Move to a read-only state, closing and releasing resources while keeping the log available for reads */
public void closeOutput() {
}
public void finish(UpdateLog.SyncLevel syncLevel) { public void finish(UpdateLog.SyncLevel syncLevel) {
if (syncLevel == UpdateLog.SyncLevel.NONE) return; if (syncLevel == UpdateLog.SyncLevel.NONE) return;
try { try {

View File

@ -821,11 +821,13 @@ public class UpdateLog implements PluginInfoInitialized {
try { try {
if (ll.endsWithCommit()) { if (ll.endsWithCommit()) {
ll.closeOutput();
ll.decref(); ll.decref();
continue; continue;
} }
} catch (IOException e) { } catch (IOException e) {
log.error("Error inspecting tlog " + ll, e); log.error("Error inspecting tlog " + ll, e);
ll.closeOutput();
ll.decref(); ll.decref();
continue; continue;
} }