From d2d0736de40c2b2c7872d2438bf1695e23ded5af Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Wed, 25 Jul 2012 21:44:26 +0000 Subject: [PATCH] HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1365792 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-hdfs/CHANGES.HDFS-3077.txt | 2 + .../hdfs/qjournal/client/AsyncLogger.java | 6 +++ .../hdfs/qjournal/client/AsyncLoggerSet.java | 6 +++ .../qjournal/client/IPCLoggerChannel.java | 11 +++++ .../qjournal/client/QuorumJournalManager.java | 5 +- .../qjournal/protocol/QJournalProtocol.java | 11 ++++- ...JournalProtocolServerSideTranslatorPB.java | 14 ++++++ .../QJournalProtocolTranslatorPB.java | 19 ++++++-- .../hdfs/qjournal/server/JNStorage.java | 2 +- .../hadoop/hdfs/qjournal/server/Journal.java | 37 +++++++++++++++ .../qjournal/server/JournalNodeRpcServer.java | 7 +++ .../src/main/proto/QJournalProtocol.proto | 14 ++++++ .../client/TestQuorumJournalManager.java | 47 +++++++++++++++++++ 13 files changed, 174 insertions(+), 7 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt index 09bab6eeb81..1bb65ea96e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt @@ -4,3 +4,5 @@ This will be merged into the main CHANGES.txt when the branch is merged. HDFS-3077. Quorum-based protocol for reading and writing edit logs. Contributed by Todd Lipcon based on initial work from Brandon Li and Hari Mankude. HDFS-3694. Fix getEditLogManifest to fetch httpPort if necessary (todd) + +HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java index 3771bb87567..fc08c0fdda2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java @@ -68,6 +68,12 @@ interface AsyncLogger { public ListenableFuture finalizeLogSegment( long startTxId, long endTxId); + /** + * Allow the remote node to purge edit logs earlier than this. + * @param minTxIdToKeep the min txid which must be retained + */ + public ListenableFuture purgeLogsOlderThan(long minTxIdToKeep); + /** * @return the state of the last epoch on the target node. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java index 60a713f9de8..2e0d4ab0bec 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java @@ -113,6 +113,12 @@ class AsyncLoggerSet { logger.close(); } } + + void purgeLogsOlderThan(long minTxIdToKeep) { + for (AsyncLogger logger : loggers) { + logger.purgeLogsOlderThan(minTxIdToKeep); + } + } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java index 096e2c3b2a9..4fad0420047 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java @@ -291,6 +291,17 @@ public class IPCLoggerChannel implements AsyncLogger { }); } + @Override + public ListenableFuture purgeLogsOlderThan(final long minTxIdToKeep) { + return executor.submit(new Callable() { + @Override + public Void call() throws Exception { + getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep); + return null; + } + }); + } + @Override public ListenableFuture getEditLogManifest( final long fromTxnId) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index f2a6246d05b..3c7f7ea19db 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -298,7 +298,10 @@ public class QuorumJournalManager implements JournalManager { @Override public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException { - // TODO Auto-generated method stub + // This purges asynchronously -- there's no need to wait for a quorum + // here, because it's always OK to fail. + LOG.info("Purging remote journals older than txid " + minTxIdToKeep); + loggers.purgeLogsOlderThan(minTxIdToKeep); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java index 900eb3635ce..c9136429e0a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java @@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochR import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.qjournal.server.JournalNode; +import org.apache.hadoop.hdfs.server.namenode.JournalManager; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.security.KerberosInfo; @@ -83,13 +84,20 @@ public interface QJournalProtocol { * Finalize the given log segment on the JournalNode. The segment * is expected to be in-progress and starting at the given startTxId. * - * @param startTxId the starting transaction ID of teh log + * @param startTxId the starting transaction ID of the log * @param endTxId the expected last transaction in the given log * @throws IOException if no such segment exists */ public void finalizeLogSegment(RequestInfo reqInfo, long startTxId, long endTxId) throws IOException; + /** + * @throws IOException + * @see JournalManager#purgeLogsOlderThan(long) + */ + public void purgeLogsOlderThan(RequestInfo requestInfo, long minTxIdToKeep) + throws IOException; + /** * @param jid the journal from which to enumerate edits * @param sinceTxId the first transaction which the client cares about @@ -110,5 +118,4 @@ public interface QJournalProtocol { */ public void acceptRecovery(RequestInfo reqInfo, SegmentStateProto stateToAccept, URL fromUrl) throws IOException; - } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java index 4859152fb19..e6a68ad37a8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java @@ -36,6 +36,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRec import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo; @@ -127,6 +129,18 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP } return FinalizeLogSegmentResponseProto.newBuilder().build(); } + + @Override + public PurgeLogsResponseProto purgeLogs(RpcController controller, + PurgeLogsRequestProto req) throws ServiceException { + try { + impl.purgeLogsOlderThan(convert(req.getReqInfo()), + req.getMinTxIdToKeep()); + } catch (IOException e) { + throw new ServiceException(e); + } + return PurgeLogsResponseProto.getDefaultInstance(); + } @Override public GetEditLogManifestResponseProto getEditLogManifest( diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java index 25b10966a1d..0805773994e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java @@ -23,7 +23,6 @@ import java.net.URL; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; -import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto; import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; @@ -39,18 +38,17 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochR import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; -import org.apache.hadoop.io.MD5Hash; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RpcClientUtil; -import com.google.protobuf.ByteString; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; @@ -165,6 +163,20 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface, throw ProtobufHelper.getRemoteException(e); } } + + @Override + public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep) + throws IOException { + PurgeLogsRequestProto req = PurgeLogsRequestProto.newBuilder() + .setReqInfo(convert(reqInfo)) + .setMinTxIdToKeep(minTxIdToKeep) + .build(); + try { + rpcProxy.purgeLogs(NULL_CONTROLLER, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } @Override public GetEditLogManifestResponseProto getEditLogManifest(String jid, @@ -214,4 +226,5 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface, QJournalProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, RPC.getProtocolVersion(QJournalProtocolPB.class), methodName); } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java index 55a132a4b53..8639f530bb9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java @@ -92,7 +92,7 @@ class JNStorage extends Storage { return new File(getPaxosDir(), String.valueOf(segmentTxId)); } - private File getPaxosDir() { + File getPaxosDir() { return new File(sd.getCurrentDir(), "paxos"); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index 6c4b86207bf..31f7a0abf81 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -28,6 +28,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; @@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.common.StorageErrorReporter; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; +import org.apache.hadoop.hdfs.server.namenode.JournalManager; import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; @@ -276,6 +278,41 @@ class Journal implements Closeable { } } + /** + * @see JournalManager#purgeLogsOlderThan(long) + */ + public synchronized void purgeLogsOlderThan(RequestInfo reqInfo, + long minTxIdToKeep) throws IOException { + checkRequest(reqInfo); + + fjm.purgeLogsOlderThan(minTxIdToKeep); + purgePaxosDecisionsOlderThan(minTxIdToKeep); + } + + private void purgePaxosDecisionsOlderThan(long minTxIdToKeep) + throws IOException { + File dir = storage.getPaxosDir(); + for (File f : FileUtil.listFiles(dir)) { + if (!f.isFile()) continue; + + long txid; + try { + txid = Long.valueOf(f.getName()); + } catch (NumberFormatException nfe) { + LOG.warn("Unexpected non-numeric file name for " + f.getAbsolutePath()); + continue; + } + + if (txid < minTxIdToKeep) { + if (!f.delete()) { + LOG.warn("Unable to delete no-longer-needed paxos decision record " + + f); + } + } + } + } + + /** * @see QJournalProtocol#getEditLogManifest(String, long) */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java index 7a7173bc3e5..114fe277dda 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java @@ -130,6 +130,13 @@ class JournalNodeRpcServer implements QJournalProtocol { .finalizeLogSegment(reqInfo, startTxId, endTxId); } + @Override + public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep) + throws IOException { + jn.getOrCreateJournal(reqInfo.getJournalId()) + .purgeLogsOlderThan(reqInfo, minTxIdToKeep); + } + @Override public GetEditLogManifestResponseProto getEditLogManifest(String jid, long sinceTxId) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto index 2834631ed96..38da49150a0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto @@ -86,6 +86,17 @@ message FinalizeLogSegmentRequestProto { message FinalizeLogSegmentResponseProto { } +/** + * purgeLogs() + */ +message PurgeLogsRequestProto { + required RequestInfoProto reqInfo = 1; + required uint64 minTxIdToKeep = 2; +} + +message PurgeLogsResponseProto { +} + /** * getJournalState() */ @@ -175,6 +186,9 @@ service QJournalProtocolService { rpc finalizeLogSegment(FinalizeLogSegmentRequestProto) returns (FinalizeLogSegmentResponseProto); + rpc purgeLogs(PurgeLogsRequestProto) + returns (PurgeLogsResponseProto); + rpc getEditLogManifest(GetEditLogManifestRequestProto) returns (GetEditLogManifestResponseProto); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index a11186ff0d2..7accd73f735 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -346,6 +346,45 @@ public class TestQuorumJournalManager { checkRecovery(cluster, 1, 4); } + @Test + public void testPurgeLogs() throws Exception { + for (int txid = 1; txid <= 5; txid++) { + writeSegment(qjm, txid, 1, true); + } + File curDir = cluster.getCurrentDir(0, JID); + GenericTestUtils.assertGlobEquals(curDir, "edits_.*", + NNStorage.getFinalizedEditsFileName(1, 1), + NNStorage.getFinalizedEditsFileName(2, 2), + NNStorage.getFinalizedEditsFileName(3, 3), + NNStorage.getFinalizedEditsFileName(4, 4), + NNStorage.getFinalizedEditsFileName(5, 5)); + File paxosDir = new File(curDir, "paxos"); + GenericTestUtils.assertExists(paxosDir); + + // Create new files in the paxos directory, which should get purged too. + assertTrue(new File(paxosDir, "1").createNewFile()); + assertTrue(new File(paxosDir, "3").createNewFile()); + + GenericTestUtils.assertGlobEquals(paxosDir, "\\d+", + "1", "3"); + + qjm.purgeLogsOlderThan(3); + + // Log purging is asynchronous, so we have to wait for the calls + // to be sent and respond before verifying. + waitForAllPendingCalls(qjm.getLoggerSetForTests()); + + // Older edits should be purged + GenericTestUtils.assertGlobEquals(curDir, "edits_.*", + NNStorage.getFinalizedEditsFileName(3, 3), + NNStorage.getFinalizedEditsFileName(4, 4), + NNStorage.getFinalizedEditsFileName(5, 5)); + + // Older paxos files should be purged + GenericTestUtils.assertGlobEquals(paxosDir, "\\d+", + "3"); + } + private QuorumJournalManager createSpyingQJM() throws IOException, URISyntaxException { @@ -385,6 +424,14 @@ public class TestQuorumJournalManager { stm.setReadyToFlush(); stm.flush(); } + + private static void waitForAllPendingCalls(AsyncLoggerSet als) + throws InterruptedException { + for (AsyncLogger l : als.getLoggersForTests()) { + IPCLoggerChannel ch = (IPCLoggerChannel)l; + ch.waitForAllPendingCalls(); + } + } private void assertExistsInQuorum(MiniJournalCluster cluster, String fname) {