From aaee4c820556ca0f62d51f939e907864e4262a32 Mon Sep 17 00:00:00 2001 From: markrmiller Date: Fri, 26 Aug 2016 13:39:59 -0400 Subject: [PATCH] SOLR-9389: HDFS Transaction logs stay open for writes which leaks Xceivers. --- solr/CHANGES.txt | 2 + .../solr/update/HdfsTransactionLog.java | 141 ++++++++++-------- .../org/apache/solr/update/HdfsUpdateLog.java | 9 +- .../apache/solr/update/TransactionLog.java | 5 + .../org/apache/solr/update/UpdateLog.java | 2 + 5 files changed, 93 insertions(+), 66 deletions(-) diff --git a/solr/CHANGES.txt b/solr/CHANGES.txt index 7c762d25407..f227db8a2cd 100644 --- a/solr/CHANGES.txt +++ b/solr/CHANGES.txt @@ -51,6 +51,8 @@ Bug Fixes to be consistent with other places in Solr. Language names still work for backwards compatibility. (Uwe Schindler, Boris Steiner) +* SOLR-9389: HDFS Transaction logs stay open for writes which leaks Xceivers. (Tim Owen via Mark Miller) + * SOLR-9188: blockUnknown property makes inter-node communication impossible (noble) Optimizations diff --git a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java index 7ccbb95cc0f..e725127a8fc 100644 --- a/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java +++ b/solr/core/src/java/org/apache/solr/update/HdfsTransactionLog.java @@ -64,7 +64,7 @@ public class HdfsTransactionLog extends TransactionLog { Path tlogFile; - + private long finalLogSize; private FSDataOutputStream tlogOutStream; private FileSystem fs; @@ -144,13 +144,8 @@ public class HdfsTransactionLog extends TransactionLog { @Override public boolean endsWithCommit() throws IOException { - long size; - synchronized (this) { - fos.flush(); - tlogOutStream.hflush(); - size = fos.size(); - } - + ensureFlushed(); + long size = getLogSize(); // the end of the file should have the end message (added during a commit) plus a 4 byte size byte[] buf = new byte[ END_MESSAGE.length() ]; @@ -159,11 +154,10 @@ public class HdfsTransactionLog extends TransactionLog { FSDataFastInputStream dis = new FSDataFastInputStream(fs.open(tlogFile), pos); try { - //ChannelFastInputStream is = new ChannelFastInputStream(channel, pos); - dis.read(buf); - for (int i=0; i= fos.size()) { + if (fis.position() >= getLogSize()) { return null; } pos = fis.position(); @@ -443,7 +460,7 @@ public class HdfsTransactionLog extends TransactionLog { @Override public String toString() { synchronized (HdfsTransactionLog.this) { - return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}"; + return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + getLogSize() + "}"; } } @@ -454,7 +471,7 @@ public class HdfsTransactionLog extends TransactionLog { @Override public long currentSize() { - return fos.size(); + return getLogSize(); } } @@ -478,12 +495,8 @@ public class HdfsTransactionLog extends TransactionLog { long sz; synchronized (HdfsTransactionLog.this) { - fos.flushBuffer(); - - // this must be an hflush - tlogOutStream.hflush(); - sz = fos.size(); - //assert sz == channel.size(); + ensureFlushed(); + sz = getLogSize(); } fis = new FSDataFastInputStream(fs.open(tlogFile), 0); @@ -554,7 +567,7 @@ public class HdfsTransactionLog extends TransactionLog { @Override public String toString() { synchronized (HdfsTransactionLog.this) { - return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + fos.size() + "}"; + return "LogReader{" + "file=" + tlogFile + ", position=" + fis.position() + ", end=" + getLogSize() + "}"; } } diff --git a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java index 4cbcf4f51ca..764b0998b1e 100644 --- a/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/HdfsUpdateLog.java @@ -219,8 +219,13 @@ public class HdfsUpdateLog extends UpdateLog { // It's possible that at abnormal close both "tlog" and "prevTlog" were // uncapped. for (TransactionLog ll : logs) { - newestLogsOnStartup.addFirst(ll); - if (newestLogsOnStartup.size() >= 2) break; + if (newestLogsOnStartup.size() < 2) { + newestLogsOnStartup.addFirst(ll); + } else { + // We're never going to modify old non-recovery logs - no need to hold their output open + log.info("Closing output for old non-recovery log " + ll); + ll.closeOutput(); + } } try { diff --git a/solr/core/src/java/org/apache/solr/update/TransactionLog.java b/solr/core/src/java/org/apache/solr/update/TransactionLog.java index f7213edbb78..997485a1495 100644 --- a/solr/core/src/java/org/apache/solr/update/TransactionLog.java +++ b/solr/core/src/java/org/apache/solr/update/TransactionLog.java @@ -520,6 +520,11 @@ public class TransactionLog implements Closeable { } } + /** Move to a read-only state, closing and releasing resources while keeping the log available for reads */ + public void closeOutput() { + + } + public void finish(UpdateLog.SyncLevel syncLevel) { if (syncLevel == UpdateLog.SyncLevel.NONE) return; try { diff --git a/solr/core/src/java/org/apache/solr/update/UpdateLog.java b/solr/core/src/java/org/apache/solr/update/UpdateLog.java index 0b4fc1895d2..5b917b88811 100644 --- a/solr/core/src/java/org/apache/solr/update/UpdateLog.java +++ b/solr/core/src/java/org/apache/solr/update/UpdateLog.java @@ -821,11 +821,13 @@ public class UpdateLog implements PluginInfoInitialized { try { if (ll.endsWithCommit()) { + ll.closeOutput(); ll.decref(); continue; } } catch (IOException e) { log.error("Error inspecting tlog " + ll, e); + ll.closeOutput(); ll.decref(); continue; }