HDFS-11180. Intermittent deadlock in NameNode when failover happens.
This commit is contained in:
parent
45bc79d135
commit
6d8df4e81e
|
@ -202,6 +202,9 @@ Release 2.7.4 - UNRELEASED
|
||||||
|
|
||||||
HDFS-11174. Wrong HttpFS test command in doc. (John Zhuge via aajisaka)
|
HDFS-11174. Wrong HttpFS test command in doc. (John Zhuge via aajisaka)
|
||||||
|
|
||||||
|
HDFS-11180. Intermittent deadlock in NameNode when failover happens.
|
||||||
|
(aajisaka)
|
||||||
|
|
||||||
Release 2.7.3 - 2016-08-25
|
Release 2.7.3 - 2016-08-25
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
|
|
@ -128,6 +128,33 @@
|
||||||
<Class name="org.apache.hadoop.hdfs.server.namenode.FSEditLog" />
|
<Class name="org.apache.hadoop.hdfs.server.namenode.FSEditLog" />
|
||||||
<Field name="journalSet" />
|
<Field name="journalSet" />
|
||||||
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||||
|
</Match>
|
||||||
|
<!--
|
||||||
|
FSEditLog#getTotalSyncCount is not synchronized because this method is
|
||||||
|
used by metrics. NullPointerException can happen and it is ignored.
|
||||||
|
-->
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.hdfs.server.namenode.FSEditLog" />
|
||||||
|
<Field name="editLogStream" />
|
||||||
|
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||||
|
</Match>
|
||||||
|
<!--
|
||||||
|
FSEditLog#isOpenForWriteWithoutLock and FSEditLog#isSegmentOpenWithoutLock
|
||||||
|
are not synchronized because these methods are used by metrics.
|
||||||
|
-->
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.hdfs.server.namenode.FSEditLog" />
|
||||||
|
<Field name="state" />
|
||||||
|
<Bug pattern="IS2_INCONSISTENT_SYNC" />
|
||||||
|
</Match>
|
||||||
|
<!--
|
||||||
|
All of the threads which update/increment txid are synchronized,
|
||||||
|
so make txid volatile instead of AtomicLong.
|
||||||
|
-->
|
||||||
|
<Match>
|
||||||
|
<Class name="org.apache.hadoop.hdfs.server.namenode.FSEditLog" />
|
||||||
|
<Field name="txid" />
|
||||||
|
<Bug pattern="VO_VOLATILE_INCREMENT" />
|
||||||
</Match>
|
</Match>
|
||||||
<!--
|
<!--
|
||||||
This method isn't performance-critical and is much clearer to write as it's written.
|
This method isn't performance-critical and is much clearer to write as it's written.
|
||||||
|
|
|
@ -153,14 +153,16 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
private EditLogOutputStream editLogStream = null;
|
private EditLogOutputStream editLogStream = null;
|
||||||
|
|
||||||
// a monotonically increasing counter that represents transactionIds.
|
// a monotonically increasing counter that represents transactionIds.
|
||||||
private long txid = 0;
|
// All of the threads which update/increment txid are synchronized,
|
||||||
|
// so make txid volatile instead of AtomicLong.
|
||||||
|
private volatile long txid = 0;
|
||||||
|
|
||||||
// stores the last synced transactionId.
|
// stores the last synced transactionId.
|
||||||
private long synctxid = 0;
|
private long synctxid = 0;
|
||||||
|
|
||||||
// the first txid of the log that's currently open for writing.
|
// the first txid of the log that's currently open for writing.
|
||||||
// If this value is N, we are currently writing to edits_inprogress_N
|
// If this value is N, we are currently writing to edits_inprogress_N
|
||||||
private long curSegmentTxId = HdfsConstants.INVALID_TXID;
|
private volatile long curSegmentTxId = HdfsConstants.INVALID_TXID;
|
||||||
|
|
||||||
// the time of printing the statistics to the log file.
|
// the time of printing the statistics to the log file.
|
||||||
private long lastPrintTime;
|
private long lastPrintTime;
|
||||||
|
@ -331,7 +333,18 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
return state == State.IN_SEGMENT ||
|
return state == State.IN_SEGMENT ||
|
||||||
state == State.BETWEEN_LOG_SEGMENTS;
|
state == State.BETWEEN_LOG_SEGMENTS;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return true if the log is currently open in write mode.
|
||||||
|
* This method is not synchronized and must be used only for metrics.
|
||||||
|
* @return true if the log is currently open in write mode, regardless
|
||||||
|
* of whether it actually has an open segment.
|
||||||
|
*/
|
||||||
|
boolean isOpenForWriteWithoutLock() {
|
||||||
|
return state == State.IN_SEGMENT ||
|
||||||
|
state == State.BETWEEN_LOG_SEGMENTS;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return true if the log is open in write mode and has a segment open
|
* @return true if the log is open in write mode and has a segment open
|
||||||
* ready to take edits.
|
* ready to take edits.
|
||||||
|
@ -340,6 +353,16 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
return state == State.IN_SEGMENT;
|
return state == State.IN_SEGMENT;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return true the state is IN_SEGMENT.
|
||||||
|
* This method is not synchronized and must be used only for metrics.
|
||||||
|
* @return true if the log is open in write mode and has a segment open
|
||||||
|
* ready to take edits.
|
||||||
|
*/
|
||||||
|
boolean isSegmentOpenWithoutLock() {
|
||||||
|
return state == State.IN_SEGMENT;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return true if the log is open in read mode.
|
* @return true if the log is open in read mode.
|
||||||
*/
|
*/
|
||||||
|
@ -513,7 +536,16 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
public synchronized long getLastWrittenTxId() {
|
public synchronized long getLastWrittenTxId() {
|
||||||
return txid;
|
return txid;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the transaction ID of the last transaction written to the log.
|
||||||
|
* This method is not synchronized and must be used only for metrics.
|
||||||
|
* @return The transaction ID of the last transaction written to the log
|
||||||
|
*/
|
||||||
|
long getLastWrittenTxIdWithoutLock() {
|
||||||
|
return txid;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the first transaction ID in the current log segment
|
* @return the first transaction ID in the current log segment
|
||||||
*/
|
*/
|
||||||
|
@ -522,7 +554,16 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
"Bad state: %s", state);
|
"Bad state: %s", state);
|
||||||
return curSegmentTxId;
|
return curSegmentTxId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the first transaction ID in the current log segment.
|
||||||
|
* This method is not synchronized and must be used only for metrics.
|
||||||
|
* @return The first transaction ID in the current log segment
|
||||||
|
*/
|
||||||
|
long getCurSegmentTxIdWithoutLock() {
|
||||||
|
return curSegmentTxId;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Set the transaction ID to use for the next transaction written.
|
* Set the transaction ID to use for the next transaction written.
|
||||||
*/
|
*/
|
||||||
|
@ -1166,7 +1207,9 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
/**
|
/**
|
||||||
* Get all the journals this edit log is currently operating on.
|
* Get all the journals this edit log is currently operating on.
|
||||||
*/
|
*/
|
||||||
synchronized List<JournalAndStream> getJournals() {
|
List<JournalAndStream> getJournals() {
|
||||||
|
// The list implementation is CopyOnWriteArrayList,
|
||||||
|
// so we don't need to synchronize this method.
|
||||||
return journalSet.getAllJournalStreams();
|
return journalSet.getAllJournalStreams();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1174,7 +1217,7 @@ public class FSEditLog implements LogsPurgeable {
|
||||||
* Used only by tests.
|
* Used only by tests.
|
||||||
*/
|
*/
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
synchronized public JournalSet getJournalSet() {
|
public JournalSet getJournalSet() {
|
||||||
return journalSet;
|
return journalSet;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1003,7 +1003,7 @@ public class FSImage implements Closeable {
|
||||||
Canceler canceler) throws IOException {
|
Canceler canceler) throws IOException {
|
||||||
FSImageCompression compression =
|
FSImageCompression compression =
|
||||||
FSImageCompression.createCompression(conf);
|
FSImageCompression.createCompression(conf);
|
||||||
long txid = getLastAppliedOrWrittenTxId();
|
long txid = getCorrectLastAppliedOrWrittenTxId();
|
||||||
SaveNamespaceContext ctx = new SaveNamespaceContext(source, txid,
|
SaveNamespaceContext ctx = new SaveNamespaceContext(source, txid,
|
||||||
canceler);
|
canceler);
|
||||||
FSImageFormat.Saver saver = new FSImageFormat.Saver(ctx);
|
FSImageFormat.Saver saver = new FSImageFormat.Saver(ctx);
|
||||||
|
@ -1101,7 +1101,7 @@ public class FSImage implements Closeable {
|
||||||
if (editLogWasOpen) {
|
if (editLogWasOpen) {
|
||||||
editLog.endCurrentLogSegment(true);
|
editLog.endCurrentLogSegment(true);
|
||||||
}
|
}
|
||||||
long imageTxId = getLastAppliedOrWrittenTxId();
|
long imageTxId = getCorrectLastAppliedOrWrittenTxId();
|
||||||
if (!addToCheckpointing(imageTxId)) {
|
if (!addToCheckpointing(imageTxId)) {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
"FS image is being downloaded from another NN at txid " + imageTxId);
|
"FS image is being downloaded from another NN at txid " + imageTxId);
|
||||||
|
@ -1463,6 +1463,15 @@ public class FSImage implements Closeable {
|
||||||
}
|
}
|
||||||
|
|
||||||
public long getLastAppliedOrWrittenTxId() {
|
public long getLastAppliedOrWrittenTxId() {
|
||||||
|
return Math.max(lastAppliedTxId,
|
||||||
|
editLog != null ? editLog.getLastWrittenTxIdWithoutLock() : 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This method holds a lock of FSEditLog to get the correct value.
|
||||||
|
* This method must not be used for metrics.
|
||||||
|
*/
|
||||||
|
public long getCorrectLastAppliedOrWrittenTxId() {
|
||||||
return Math.max(lastAppliedTxId,
|
return Math.max(lastAppliedTxId,
|
||||||
editLog != null ? editLog.getLastWrittenTxId() : 0);
|
editLog != null ? editLog.getLastWrittenTxId() : 0);
|
||||||
}
|
}
|
||||||
|
|
|
@ -4516,7 +4516,7 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
//create ha status
|
//create ha status
|
||||||
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
|
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
|
||||||
haContext.getState().getServiceState(),
|
haContext.getState().getServiceState(),
|
||||||
getFSImage().getLastAppliedOrWrittenTxId());
|
getFSImage().getCorrectLastAppliedOrWrittenTxId());
|
||||||
|
|
||||||
return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo);
|
return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo);
|
||||||
} finally {
|
} finally {
|
||||||
|
@ -4775,17 +4775,17 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
@Metric({"TransactionsSinceLastLogRoll",
|
@Metric({"TransactionsSinceLastLogRoll",
|
||||||
"Number of transactions since last edit log roll"})
|
"Number of transactions since last edit log roll"})
|
||||||
public long getTransactionsSinceLastLogRoll() {
|
public long getTransactionsSinceLastLogRoll() {
|
||||||
if (isInStandbyState() || !getEditLog().isSegmentOpen()) {
|
if (isInStandbyState() || !getEditLog().isSegmentOpenWithoutLock()) {
|
||||||
return 0;
|
return 0;
|
||||||
} else {
|
} else {
|
||||||
return getEditLog().getLastWrittenTxId() -
|
return getEditLog().getLastWrittenTxIdWithoutLock() -
|
||||||
getEditLog().getCurSegmentTxId() + 1;
|
getEditLog().getCurSegmentTxIdWithoutLock() + 1;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Metric({"LastWrittenTransactionId", "Transaction ID written to the edit log"})
|
@Metric({"LastWrittenTransactionId", "Transaction ID written to the edit log"})
|
||||||
public long getLastWrittenTransactionId() {
|
public long getLastWrittenTransactionId() {
|
||||||
return getEditLog().getLastWrittenTxId();
|
return getEditLog().getLastWrittenTxIdWithoutLock();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Metric({"LastCheckpointTime",
|
@Metric({"LastCheckpointTime",
|
||||||
|
@ -7102,7 +7102,9 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean,
|
||||||
List<Map<String, String>> jasList = new ArrayList<Map<String, String>>();
|
List<Map<String, String>> jasList = new ArrayList<Map<String, String>>();
|
||||||
FSEditLog log = getFSImage().getEditLog();
|
FSEditLog log = getFSImage().getEditLog();
|
||||||
if (log != null) {
|
if (log != null) {
|
||||||
boolean openForWrite = log.isOpenForWrite();
|
// This flag can be false because we cannot hold a lock of FSEditLog
|
||||||
|
// for metrics.
|
||||||
|
boolean openForWrite = log.isOpenForWriteWithoutLock();
|
||||||
for (JournalAndStream jas : log.getJournals()) {
|
for (JournalAndStream jas : log.getJournals()) {
|
||||||
final Map<String, String> jasMap = new HashMap<String, String>();
|
final Map<String, String> jasMap = new HashMap<String, String>();
|
||||||
String manager = jas.getManager().toString();
|
String manager = jas.getManager().toString();
|
||||||
|
|
|
@ -1104,7 +1104,7 @@ class NameNodeRpcServer implements NamenodeProtocols {
|
||||||
checkNNStartup();
|
checkNNStartup();
|
||||||
namesystem.checkOperation(OperationCategory.UNCHECKED);
|
namesystem.checkOperation(OperationCategory.UNCHECKED);
|
||||||
namesystem.checkSuperuserPrivilege();
|
namesystem.checkSuperuserPrivilege();
|
||||||
return namesystem.getFSImage().getLastAppliedOrWrittenTxId();
|
return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override // NamenodeProtocol
|
@Override // NamenodeProtocol
|
||||||
|
|
|
@ -165,7 +165,7 @@ public class StandbyCheckpointer {
|
||||||
FSImage img = namesystem.getFSImage();
|
FSImage img = namesystem.getFSImage();
|
||||||
|
|
||||||
long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();
|
long prevCheckpointTxId = img.getStorage().getMostRecentCheckpointTxId();
|
||||||
long thisCheckpointTxId = img.getLastAppliedOrWrittenTxId();
|
long thisCheckpointTxId = img.getCorrectLastAppliedOrWrittenTxId();
|
||||||
assert thisCheckpointTxId >= prevCheckpointTxId;
|
assert thisCheckpointTxId >= prevCheckpointTxId;
|
||||||
if (thisCheckpointTxId == prevCheckpointTxId) {
|
if (thisCheckpointTxId == prevCheckpointTxId) {
|
||||||
LOG.info("A checkpoint was triggered but the Standby Node has not " +
|
LOG.info("A checkpoint was triggered but the Standby Node has not " +
|
||||||
|
@ -253,7 +253,7 @@ public class StandbyCheckpointer {
|
||||||
|
|
||||||
private long countUncheckpointedTxns() {
|
private long countUncheckpointedTxns() {
|
||||||
FSImage img = namesystem.getFSImage();
|
FSImage img = namesystem.getFSImage();
|
||||||
return img.getLastAppliedOrWrittenTxId() -
|
return img.getCorrectLastAppliedOrWrittenTxId() -
|
||||||
img.getStorage().getMostRecentCheckpointTxId();
|
img.getStorage().getMostRecentCheckpointTxId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -159,4 +159,28 @@ public class TestFSNamesystemMBean {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// The test makes sure JMX request can be processed even if FSEditLog
|
||||||
|
// is synchronized.
|
||||||
|
@Test
|
||||||
|
public void testWithFSEditLogLock() throws Exception {
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
MiniDFSCluster cluster = null;
|
||||||
|
try {
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf).build();
|
||||||
|
cluster.waitActive();
|
||||||
|
synchronized (cluster.getNameNode().getFSImage().getEditLog()) {
|
||||||
|
MBeanClient client = new MBeanClient();
|
||||||
|
client.start();
|
||||||
|
client.join(20000);
|
||||||
|
assertTrue("JMX calls are blocked when FSEditLog" +
|
||||||
|
" is synchronized by another thread", client.succeeded);
|
||||||
|
client.interrupt();
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
if (cluster != null) {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue