HDFS-13688. [SBN read] Introduce msync API call. Contributed by Chen Liang.

Also fix a issue in IPCLoggerChannel
This commit is contained in:
Erik Krogen 2018-08-01 09:58:04 -07:00 committed by Chen Liang
parent ea6d36763b
commit 33d8aae555
11 changed files with 87 additions and 5 deletions

View File

@ -3026,4 +3026,18 @@ public class DFSClient implements java.io.Closeable, RemotePeerFactory,
checkOpen();
return new OpenFilesIterator(namenode, tracer);
}
/**
* A blocking call to wait for Observer NameNode state ID to reach to the
* current client state ID. Current client state ID is given by the client
* alignment context.
* An assumption is that client alignment context has the state ID set at this
* point. This is become ObserverReadProxyProvider sets up the initial state
* ID when it is being created.
*
* @throws IOException
*/
public void msync() throws IOException {
namenode.msync();
}
}

View File

@ -1565,4 +1565,15 @@ public interface ClientProtocol {
@Idempotent
@ReadOnly
BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) throws IOException;
/**
* Called by client to wait until the server has reached the state id of the
* client. The client and server state id are given by client side and server
* side alignment context respectively. This can be a blocking call.
*
* @throws IOException
*/
@Idempotent
@ReadOnly
void msync() throws IOException;
}

View File

@ -138,6 +138,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MetaSa
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MkdirsRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.OpenFilesBatchResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
@ -1606,4 +1608,13 @@ public class ClientNamenodeProtocolTranslatorPB implements
}
}
@Override
public void msync() throws IOException {
MsyncRequestProto.Builder req = MsyncRequestProto.newBuilder();
try {
rpcProxy.msync(null, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
}

View File

@ -762,6 +762,12 @@ message ListOpenFilesResponseProto {
required bool hasMore = 2;
}
message MsyncRequestProto {
}
message MsyncResponseProto {
}
service ClientNamenodeProtocol {
rpc getBlockLocations(GetBlockLocationsRequestProto)
returns(GetBlockLocationsResponseProto);
@ -916,4 +922,11 @@ service ClientNamenodeProtocol {
returns(GetQuotaUsageResponseProto);
rpc listOpenFiles(ListOpenFilesRequestProto)
returns(ListOpenFilesResponseProto);
<<<<<<< HEAD
=======
rpc msync(MsyncRequestProto)
returns(MsyncResponseProto);
rpc satisfyStoragePolicy(SatisfyStoragePolicyRequestProto)
returns(SatisfyStoragePolicyResponseProto);
>>>>>>> eae0a5d54a2... HDFS-13688. [SBN read] Introduce msync API call. Contributed by Chen Liang.
}

View File

@ -71,7 +71,8 @@ public class TestReadOnly {
"getDataEncryptionKey",
"getCurrentEditLogTxid",
"getEditsFromTxid",
"getQuotaUsage"
"getQuotaUsage",
"msync"
)
);

View File

@ -1396,6 +1396,13 @@ public class RouterClientProtocol implements ClientProtocol {
return null;
}
@Override
public void msync() throws IOException {
// TODO revisit if router should support msync
throw new UnsupportedOperationException(
"msync is not supported for router");
}
/**
* Determines combinations of eligible src/dst locations for a rename. A
* rename cannot change the namespace. Renames are only allowed if there is an

View File

@ -1081,6 +1081,13 @@ public class RouterRpcServer extends AbstractService
return clientProto.listOpenFiles(prevId);
}
@Override
public void msync() throws IOException {
// TODO revisit if router should support msync
throw new UnsupportedOperationException(
"msync is not supported for router");
}
@Override // NamenodeProtocol
public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size)
throws IOException {

View File

@ -154,6 +154,8 @@ import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.Modify
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCacheDirectiveResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.ModifyCachePoolResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.MsyncResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RecoverLeaseResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.RefreshNodesRequestProto;
@ -1585,4 +1587,15 @@ public class ClientNamenodeProtocolServerSideTranslatorPB implements
throw new ServiceException(e);
}
}
@Override
public MsyncResponseProto msync(RpcController controller,
MsyncRequestProto req) throws ServiceException {
try {
server.msync();
return MsyncResponseProto.newBuilder().build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
}

View File

@ -555,8 +555,8 @@ public class IPCLoggerChannel implements AsyncLogger {
new Callable<GetJournaledEditsResponseProto>() {
@Override
public GetJournaledEditsResponseProto call() throws IOException {
return getProxy().getJournaledEdits(journalId, nameServiceId,
fromTxnId, maxTransactions);
return getProxy().getJournaledEdits(journalId, fromTxnId,
maxTransactions);
}
});
}

View File

@ -153,8 +153,8 @@ public interface QJournalProtocol {
* @return Response containing serialized edits to be loaded
* @see org.apache.hadoop.hdfs.qjournal.server.JournaledEditsCache
*/
GetJournaledEditsResponseProto getJournaledEdits(String jid,
String nameServiceId, long sinceTxId, int maxTxns) throws IOException;
GetJournaledEditsResponseProto getJournaledEdits(String jid, long sinceTxId,
int maxTxns) throws IOException;
/**
* Begin the recovery process for a given segment. See the HDFS-3077

View File

@ -1299,6 +1299,11 @@ public class NameNodeRpcServer implements NamenodeProtocols {
return namesystem.listOpenFiles(prevId);
}
@Override // ClientProtocol
public void msync() throws IOException {
// TODO : need to be filled up if needed. May be a no-op here.
}
@Override // ClientProtocol
public CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
throws IOException {