HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1365792 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-07-25 21:44:26 +00:00
parent 939f4a9f92
commit d2d0736de4
13 changed files with 174 additions and 7 deletions

View File

@ -4,3 +4,5 @@ This will be merged into the main CHANGES.txt when the branch is merged.
HDFS-3077. Quorum-based protocol for reading and writing edit logs. Contributed by Todd Lipcon based on initial work from Brandon Li and Hari Mankude. HDFS-3077. Quorum-based protocol for reading and writing edit logs. Contributed by Todd Lipcon based on initial work from Brandon Li and Hari Mankude.
HDFS-3694. Fix getEditLogManifest to fetch httpPort if necessary (todd) HDFS-3694. Fix getEditLogManifest to fetch httpPort if necessary (todd)
HDFS-3692. Support purgeEditLogs() call to remotely purge logs on JNs (todd)

View File

@ -68,6 +68,12 @@ interface AsyncLogger {
public ListenableFuture<Void> finalizeLogSegment( public ListenableFuture<Void> finalizeLogSegment(
long startTxId, long endTxId); long startTxId, long endTxId);
/**
* Allow the remote node to purge edit logs earlier than this.
* @param minTxIdToKeep the min txid which must be retained
*/
public ListenableFuture<Void> purgeLogsOlderThan(long minTxIdToKeep);
/** /**
* @return the state of the last epoch on the target node. * @return the state of the last epoch on the target node.
*/ */

View File

@ -114,6 +114,12 @@ class AsyncLoggerSet {
} }
} }
void purgeLogsOlderThan(long minTxIdToKeep) {
for (AsyncLogger logger : loggers) {
logger.purgeLogsOlderThan(minTxIdToKeep);
}
}
/** /**
* Wait for a quorum of loggers to respond to the given call. If a quorum * Wait for a quorum of loggers to respond to the given call. If a quorum

View File

@ -291,6 +291,17 @@ public class IPCLoggerChannel implements AsyncLogger {
}); });
} }
@Override
public ListenableFuture<Void> purgeLogsOlderThan(final long minTxIdToKeep) {
return executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
getProxy().purgeLogsOlderThan(createReqInfo(), minTxIdToKeep);
return null;
}
});
}
@Override @Override
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest( public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
final long fromTxnId) { final long fromTxnId) {

View File

@ -298,7 +298,10 @@ public class QuorumJournalManager implements JournalManager {
@Override @Override
public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException { public void purgeLogsOlderThan(long minTxIdToKeep) throws IOException {
// TODO Auto-generated method stub // This purges asynchronously -- there's no need to wait for a quorum
// here, because it's always OK to fail.
LOG.info("Purging remote journals older than txid " + minTxIdToKeep);
loggers.purgeLogsOlderThan(minTxIdToKeep);
} }
@Override @Override

View File

@ -29,6 +29,7 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochR
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.server.JournalNode; import org.apache.hadoop.hdfs.qjournal.server.JournalNode;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.KerberosInfo;
@ -83,13 +84,20 @@ public interface QJournalProtocol {
* Finalize the given log segment on the JournalNode. The segment * Finalize the given log segment on the JournalNode. The segment
* is expected to be in-progress and starting at the given startTxId. * is expected to be in-progress and starting at the given startTxId.
* *
* @param startTxId the starting transaction ID of teh log * @param startTxId the starting transaction ID of the log
* @param endTxId the expected last transaction in the given log * @param endTxId the expected last transaction in the given log
* @throws IOException if no such segment exists * @throws IOException if no such segment exists
*/ */
public void finalizeLogSegment(RequestInfo reqInfo, public void finalizeLogSegment(RequestInfo reqInfo,
long startTxId, long endTxId) throws IOException; long startTxId, long endTxId) throws IOException;
/**
* @throws IOException
* @see JournalManager#purgeLogsOlderThan(long)
*/
public void purgeLogsOlderThan(RequestInfo requestInfo, long minTxIdToKeep)
throws IOException;
/** /**
* @param jid the journal from which to enumerate edits * @param jid the journal from which to enumerate edits
* @param sinceTxId the first transaction which the client cares about * @param sinceTxId the first transaction which the client cares about
@ -110,5 +118,4 @@ public interface QJournalProtocol {
*/ */
public void acceptRecovery(RequestInfo reqInfo, public void acceptRecovery(RequestInfo reqInfo,
SegmentStateProto stateToAccept, URL fromUrl) throws IOException; SegmentStateProto stateToAccept, URL fromUrl) throws IOException;
} }

View File

@ -36,6 +36,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRec
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo; import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
@ -128,6 +130,18 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
return FinalizeLogSegmentResponseProto.newBuilder().build(); return FinalizeLogSegmentResponseProto.newBuilder().build();
} }
@Override
public PurgeLogsResponseProto purgeLogs(RpcController controller,
PurgeLogsRequestProto req) throws ServiceException {
try {
impl.purgeLogsOlderThan(convert(req.getReqInfo()),
req.getMinTxIdToKeep());
} catch (IOException e) {
throw new ServiceException(e);
}
return PurgeLogsResponseProto.getDefaultInstance();
}
@Override @Override
public GetEditLogManifestResponseProto getEditLogManifest( public GetEditLogManifestResponseProto getEditLogManifest(
RpcController controller, GetEditLogManifestRequestProto request) RpcController controller, GetEditLogManifestRequestProto request)

View File

@ -23,7 +23,6 @@ import java.net.URL;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.RemoteEditLogProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
@ -39,18 +38,17 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochR
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.AcceptRecoveryRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PurgeLogsRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.StartLogSegmentRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo; import org.apache.hadoop.hdfs.qjournal.protocol.RequestInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.io.MD5Hash;
import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RpcClientUtil; import org.apache.hadoop.ipc.RpcClientUtil;
import com.google.protobuf.ByteString;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
import com.google.protobuf.ServiceException; import com.google.protobuf.ServiceException;
@ -166,6 +164,20 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
} }
} }
@Override
public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
throws IOException {
PurgeLogsRequestProto req = PurgeLogsRequestProto.newBuilder()
.setReqInfo(convert(reqInfo))
.setMinTxIdToKeep(minTxIdToKeep)
.build();
try {
rpcProxy.purgeLogs(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override @Override
public GetEditLogManifestResponseProto getEditLogManifest(String jid, public GetEditLogManifestResponseProto getEditLogManifest(String jid,
long sinceTxId) throws IOException { long sinceTxId) throws IOException {
@ -214,4 +226,5 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
QJournalProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER, QJournalProtocolPB.class, RPC.RpcKind.RPC_PROTOCOL_BUFFER,
RPC.getProtocolVersion(QJournalProtocolPB.class), methodName); RPC.getProtocolVersion(QJournalProtocolPB.class), methodName);
} }
} }

View File

@ -92,7 +92,7 @@ class JNStorage extends Storage {
return new File(getPaxosDir(), String.valueOf(segmentTxId)); return new File(getPaxosDir(), String.valueOf(segmentTxId));
} }
private File getPaxosDir() { File getPaxosDir() {
return new File(sd.getCurrentDir(), "paxos"); return new File(sd.getCurrentDir(), "paxos");
} }

View File

@ -28,6 +28,7 @@ import java.util.List;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
@ -39,6 +40,7 @@ import org.apache.hadoop.hdfs.server.common.StorageErrorReporter;
import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream; import org.apache.hadoop.hdfs.server.namenode.EditLogOutputStream;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager;
import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile; import org.apache.hadoop.hdfs.server.namenode.FileJournalManager.EditLogFile;
import org.apache.hadoop.hdfs.server.namenode.JournalManager;
import org.apache.hadoop.hdfs.server.namenode.TransferFsImage; import org.apache.hadoop.hdfs.server.namenode.TransferFsImage;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
@ -276,6 +278,41 @@ class Journal implements Closeable {
} }
} }
/**
* @see JournalManager#purgeLogsOlderThan(long)
*/
public synchronized void purgeLogsOlderThan(RequestInfo reqInfo,
long minTxIdToKeep) throws IOException {
checkRequest(reqInfo);
fjm.purgeLogsOlderThan(minTxIdToKeep);
purgePaxosDecisionsOlderThan(minTxIdToKeep);
}
private void purgePaxosDecisionsOlderThan(long minTxIdToKeep)
throws IOException {
File dir = storage.getPaxosDir();
for (File f : FileUtil.listFiles(dir)) {
if (!f.isFile()) continue;
long txid;
try {
txid = Long.valueOf(f.getName());
} catch (NumberFormatException nfe) {
LOG.warn("Unexpected non-numeric file name for " + f.getAbsolutePath());
continue;
}
if (txid < minTxIdToKeep) {
if (!f.delete()) {
LOG.warn("Unable to delete no-longer-needed paxos decision record " +
f);
}
}
}
}
/** /**
* @see QJournalProtocol#getEditLogManifest(String, long) * @see QJournalProtocol#getEditLogManifest(String, long)
*/ */

View File

@ -130,6 +130,13 @@ class JournalNodeRpcServer implements QJournalProtocol {
.finalizeLogSegment(reqInfo, startTxId, endTxId); .finalizeLogSegment(reqInfo, startTxId, endTxId);
} }
@Override
public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId())
.purgeLogsOlderThan(reqInfo, minTxIdToKeep);
}
@Override @Override
public GetEditLogManifestResponseProto getEditLogManifest(String jid, public GetEditLogManifestResponseProto getEditLogManifest(String jid,
long sinceTxId) throws IOException { long sinceTxId) throws IOException {

View File

@ -86,6 +86,17 @@ message FinalizeLogSegmentRequestProto {
message FinalizeLogSegmentResponseProto { message FinalizeLogSegmentResponseProto {
} }
/**
* purgeLogs()
*/
message PurgeLogsRequestProto {
required RequestInfoProto reqInfo = 1;
required uint64 minTxIdToKeep = 2;
}
message PurgeLogsResponseProto {
}
/** /**
* getJournalState() * getJournalState()
*/ */
@ -175,6 +186,9 @@ service QJournalProtocolService {
rpc finalizeLogSegment(FinalizeLogSegmentRequestProto) rpc finalizeLogSegment(FinalizeLogSegmentRequestProto)
returns (FinalizeLogSegmentResponseProto); returns (FinalizeLogSegmentResponseProto);
rpc purgeLogs(PurgeLogsRequestProto)
returns (PurgeLogsResponseProto);
rpc getEditLogManifest(GetEditLogManifestRequestProto) rpc getEditLogManifest(GetEditLogManifestRequestProto)
returns (GetEditLogManifestResponseProto); returns (GetEditLogManifestResponseProto);

View File

@ -346,6 +346,45 @@ public class TestQuorumJournalManager {
checkRecovery(cluster, 1, 4); checkRecovery(cluster, 1, 4);
} }
@Test
public void testPurgeLogs() throws Exception {
for (int txid = 1; txid <= 5; txid++) {
writeSegment(qjm, txid, 1, true);
}
File curDir = cluster.getCurrentDir(0, JID);
GenericTestUtils.assertGlobEquals(curDir, "edits_.*",
NNStorage.getFinalizedEditsFileName(1, 1),
NNStorage.getFinalizedEditsFileName(2, 2),
NNStorage.getFinalizedEditsFileName(3, 3),
NNStorage.getFinalizedEditsFileName(4, 4),
NNStorage.getFinalizedEditsFileName(5, 5));
File paxosDir = new File(curDir, "paxos");
GenericTestUtils.assertExists(paxosDir);
// Create new files in the paxos directory, which should get purged too.
assertTrue(new File(paxosDir, "1").createNewFile());
assertTrue(new File(paxosDir, "3").createNewFile());
GenericTestUtils.assertGlobEquals(paxosDir, "\\d+",
"1", "3");
qjm.purgeLogsOlderThan(3);
// Log purging is asynchronous, so we have to wait for the calls
// to be sent and respond before verifying.
waitForAllPendingCalls(qjm.getLoggerSetForTests());
// Older edits should be purged
GenericTestUtils.assertGlobEquals(curDir, "edits_.*",
NNStorage.getFinalizedEditsFileName(3, 3),
NNStorage.getFinalizedEditsFileName(4, 4),
NNStorage.getFinalizedEditsFileName(5, 5));
// Older paxos files should be purged
GenericTestUtils.assertGlobEquals(paxosDir, "\\d+",
"3");
}
private QuorumJournalManager createSpyingQJM() private QuorumJournalManager createSpyingQJM()
throws IOException, URISyntaxException { throws IOException, URISyntaxException {
@ -386,6 +425,14 @@ public class TestQuorumJournalManager {
stm.flush(); stm.flush();
} }
private static void waitForAllPendingCalls(AsyncLoggerSet als)
throws InterruptedException {
for (AsyncLogger l : als.getLoggersForTests()) {
IPCLoggerChannel ch = (IPCLoggerChannel)l;
ch.waitForAllPendingCalls();
}
}
private void assertExistsInQuorum(MiniJournalCluster cluster, private void assertExistsInQuorum(MiniJournalCluster cluster,
String fname) { String fname) {
int count = 0; int count = 0;