HDFS-13880. Add mechanism to allow certain RPC calls to bypass sync. Contributed by Chen Liang.
This commit is contained in:
parent
34b05a26ac
commit
613c9e4f7b
|
@ -38,6 +38,7 @@ public interface AlignmentContext {
|
||||||
/**
|
/**
|
||||||
* This is the intended server method call to implement to pass state info
|
* This is the intended server method call to implement to pass state info
|
||||||
* during RPC response header construction.
|
* during RPC response header construction.
|
||||||
|
*
|
||||||
* @param header The RPC response header builder.
|
* @param header The RPC response header builder.
|
||||||
*/
|
*/
|
||||||
void updateResponseState(RpcResponseHeaderProto.Builder header);
|
void updateResponseState(RpcResponseHeaderProto.Builder header);
|
||||||
|
@ -45,6 +46,7 @@ public interface AlignmentContext {
|
||||||
/**
|
/**
|
||||||
* This is the intended client method call to implement to recieve state info
|
* This is the intended client method call to implement to recieve state info
|
||||||
* during RPC response processing.
|
* during RPC response processing.
|
||||||
|
*
|
||||||
* @param header The RPC response header.
|
* @param header The RPC response header.
|
||||||
*/
|
*/
|
||||||
void receiveResponseState(RpcResponseHeaderProto header);
|
void receiveResponseState(RpcResponseHeaderProto header);
|
||||||
|
@ -52,6 +54,7 @@ public interface AlignmentContext {
|
||||||
/**
|
/**
|
||||||
* This is the intended client method call to pull last seen state info
|
* This is the intended client method call to pull last seen state info
|
||||||
* into RPC request processing.
|
* into RPC request processing.
|
||||||
|
*
|
||||||
* @param header The RPC request header builder.
|
* @param header The RPC request header builder.
|
||||||
*/
|
*/
|
||||||
void updateRequestState(RpcRequestHeaderProto.Builder header);
|
void updateRequestState(RpcRequestHeaderProto.Builder header);
|
||||||
|
@ -59,6 +62,7 @@ public interface AlignmentContext {
|
||||||
/**
|
/**
|
||||||
* This is the intended server method call to implement to receive
|
* This is the intended server method call to implement to receive
|
||||||
* client state info during RPC response header processing.
|
* client state info during RPC response header processing.
|
||||||
|
*
|
||||||
* @param header The RPC request header.
|
* @param header The RPC request header.
|
||||||
* @return state id of in the request header.
|
* @return state id of in the request header.
|
||||||
*/
|
*/
|
||||||
|
@ -66,7 +70,19 @@ public interface AlignmentContext {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Returns the last seen state id of the alignment context instance.
|
* Returns the last seen state id of the alignment context instance.
|
||||||
|
*
|
||||||
* @return the value of the last seen state id.
|
* @return the value of the last seen state id.
|
||||||
*/
|
*/
|
||||||
long getLastSeenStateId();
|
long getLastSeenStateId();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return true if this method call does need to be synced, false
|
||||||
|
* otherwise. sync meaning server state needs to have caught up with
|
||||||
|
* client state.
|
||||||
|
*
|
||||||
|
* @param protocolName the name of the protocol
|
||||||
|
* @param method the method call to check
|
||||||
|
* @return true if this method is async, false otherwise.
|
||||||
|
*/
|
||||||
|
boolean isCoordinatedCall(String protocolName, String method);
|
||||||
}
|
}
|
||||||
|
|
|
@ -707,6 +707,7 @@ public abstract class Server {
|
||||||
private int priorityLevel;
|
private int priorityLevel;
|
||||||
// the priority level assigned by scheduler, 0 by default
|
// the priority level assigned by scheduler, 0 by default
|
||||||
private long clientStateId;
|
private long clientStateId;
|
||||||
|
private boolean isCallCoordinated;
|
||||||
|
|
||||||
Call() {
|
Call() {
|
||||||
this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
|
this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT,
|
||||||
|
@ -738,6 +739,7 @@ public abstract class Server {
|
||||||
this.traceScope = traceScope;
|
this.traceScope = traceScope;
|
||||||
this.callerContext = callerContext;
|
this.callerContext = callerContext;
|
||||||
this.clientStateId = Long.MIN_VALUE;
|
this.clientStateId = Long.MIN_VALUE;
|
||||||
|
this.isCallCoordinated = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -823,6 +825,14 @@ public abstract class Server {
|
||||||
this.clientStateId = stateId;
|
this.clientStateId = stateId;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void markCallCoordinated(boolean flag) {
|
||||||
|
this.isCallCoordinated = flag;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isCallCoordinated() {
|
||||||
|
return this.isCallCoordinated;
|
||||||
|
}
|
||||||
|
|
||||||
@InterfaceStability.Unstable
|
@InterfaceStability.Unstable
|
||||||
public void deferResponse() {
|
public void deferResponse() {
|
||||||
this.deferredResponse = true;
|
this.deferredResponse = true;
|
||||||
|
@ -2521,10 +2531,32 @@ public abstract class Server {
|
||||||
|
|
||||||
// Save the priority level assignment by the scheduler
|
// Save the priority level assignment by the scheduler
|
||||||
call.setPriorityLevel(callQueue.getPriorityLevel(call));
|
call.setPriorityLevel(callQueue.getPriorityLevel(call));
|
||||||
if(alignmentContext != null) {
|
if(alignmentContext != null && call.rpcRequest != null &&
|
||||||
|
(call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) {
|
||||||
|
// if call.rpcRequest is not RpcProtobufRequest, will skip the following
|
||||||
|
// step and treat the call as uncoordinated. As currently only certain
|
||||||
|
// ClientProtocol methods request made through RPC protobuf needs to be
|
||||||
|
// coordinated.
|
||||||
|
String methodName;
|
||||||
|
String protoName;
|
||||||
|
try {
|
||||||
|
ProtobufRpcEngine.RpcProtobufRequest req =
|
||||||
|
(ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest;
|
||||||
|
methodName = req.getRequestHeader().getMethodName();
|
||||||
|
protoName = req.getRequestHeader().getDeclaringClassProtocolName();
|
||||||
|
} catch (IOException ioe) {
|
||||||
|
throw new RpcServerException("Rpc request header check fail", ioe);
|
||||||
|
}
|
||||||
|
if (!alignmentContext.isCoordinatedCall(protoName, methodName)) {
|
||||||
|
call.markCallCoordinated(false);
|
||||||
|
} else {
|
||||||
|
call.markCallCoordinated(true);
|
||||||
long stateId = alignmentContext.receiveRequestState(header);
|
long stateId = alignmentContext.receiveRequestState(header);
|
||||||
call.setClientStateId(stateId);
|
call.setClientStateId(stateId);
|
||||||
}
|
}
|
||||||
|
} else {
|
||||||
|
call.markCallCoordinated(false);
|
||||||
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
internalQueueCall(call);
|
internalQueueCall(call);
|
||||||
|
@ -2706,8 +2738,8 @@ public abstract class Server {
|
||||||
TraceScope traceScope = null;
|
TraceScope traceScope = null;
|
||||||
try {
|
try {
|
||||||
final Call call = callQueue.take(); // pop the queue; maybe blocked here
|
final Call call = callQueue.take(); // pop the queue; maybe blocked here
|
||||||
if (alignmentContext != null && call.getClientStateId() >
|
if (alignmentContext != null && call.isCallCoordinated() &&
|
||||||
alignmentContext.getLastSeenStateId()) {
|
call.getClientStateId() > alignmentContext.getLastSeenStateId()) {
|
||||||
/*
|
/*
|
||||||
* The call processing should be postponed until the client call's
|
* The call processing should be postponed until the client call's
|
||||||
* state id is aligned (>=) with the server state id.
|
* state id is aligned (>=) with the server state id.
|
||||||
|
|
|
@ -44,6 +44,12 @@ public class ClientGSIContext implements AlignmentContext {
|
||||||
return lastSeenStateId.get();
|
return lastSeenStateId.get();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isCoordinatedCall(String protocolName, String method) {
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"Client should not be checking uncoordinated call");
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Client side implementation only receives state alignment info.
|
* Client side implementation only receives state alignment info.
|
||||||
* It does not provide state alignment info therefore this does nothing.
|
* It does not provide state alignment info therefore this does nothing.
|
||||||
|
|
|
@ -129,7 +129,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException If an I/O error occurred
|
* @throws IOException If an I/O error occurred
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly(atimeAffected = true)
|
@ReadOnly(atimeAffected = true, isCoordinated = true)
|
||||||
LocatedBlocks getBlockLocations(String src, long offset, long length)
|
LocatedBlocks getBlockLocations(String src, long offset, long length)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
@ -139,7 +139,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
FsServerDefaults getServerDefaults() throws IOException;
|
FsServerDefaults getServerDefaults() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -280,7 +280,7 @@ public interface ClientProtocol {
|
||||||
* @return All the in-use block storage policies currently.
|
* @return All the in-use block storage policies currently.
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
BlockStoragePolicy[] getStoragePolicies() throws IOException;
|
BlockStoragePolicy[] getStoragePolicies() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -323,7 +323,7 @@ public interface ClientProtocol {
|
||||||
* If file/dir <code>src</code> is not found
|
* If file/dir <code>src</code> is not found
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
BlockStoragePolicy getStoragePolicy(String path) throws IOException;
|
BlockStoragePolicy getStoragePolicy(String path) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -690,7 +690,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException If an I/O error occurred
|
* @throws IOException If an I/O error occurred
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
DirectoryListing getListing(String src, byte[] startAfter,
|
DirectoryListing getListing(String src, byte[] startAfter,
|
||||||
boolean needLocation) throws IOException;
|
boolean needLocation) throws IOException;
|
||||||
|
|
||||||
|
@ -702,7 +702,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException If an I/O error occurred.
|
* @throws IOException If an I/O error occurred.
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
SnapshottableDirectoryStatus[] getSnapshottableDirListing()
|
SnapshottableDirectoryStatus[] getSnapshottableDirListing()
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
@ -830,7 +830,7 @@ public interface ClientProtocol {
|
||||||
* a symlink.
|
* a symlink.
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
long getPreferredBlockSize(String filename)
|
long getPreferredBlockSize(String filename)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
@ -975,7 +975,7 @@ public interface ClientProtocol {
|
||||||
* cookie returned from the previous call.
|
* cookie returned from the previous call.
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
|
CorruptFileBlocks listCorruptFileBlocks(String path, String cookie)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
@ -1011,7 +1011,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException If an I/O error occurred
|
* @throws IOException If an I/O error occurred
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
HdfsFileStatus getFileInfo(String src) throws IOException;
|
HdfsFileStatus getFileInfo(String src) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1026,7 +1026,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException If an I/O error occurred
|
* @throws IOException If an I/O error occurred
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
boolean isFileClosed(String src) throws IOException;
|
boolean isFileClosed(String src) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1043,7 +1043,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException If an I/O error occurred
|
* @throws IOException If an I/O error occurred
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
HdfsFileStatus getFileLinkInfo(String src) throws IOException;
|
HdfsFileStatus getFileLinkInfo(String src) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1057,7 +1057,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException If an I/O error occurred
|
* @throws IOException If an I/O error occurred
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
ContentSummary getContentSummary(String path) throws IOException;
|
ContentSummary getContentSummary(String path) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1170,7 +1170,7 @@ public interface ClientProtocol {
|
||||||
* or an I/O error occurred
|
* or an I/O error occurred
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
String getLinkTarget(String path) throws IOException;
|
String getLinkTarget(String path) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1241,7 +1241,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
DataEncryptionKey getDataEncryptionKey() throws IOException;
|
DataEncryptionKey getDataEncryptionKey() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1310,7 +1310,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException on error
|
* @throws IOException on error
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
|
SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot,
|
||||||
String fromSnapshot, String toSnapshot) throws IOException;
|
String fromSnapshot, String toSnapshot) throws IOException;
|
||||||
|
|
||||||
|
@ -1356,7 +1356,7 @@ public interface ClientProtocol {
|
||||||
* @return A batch of CacheDirectiveEntry objects.
|
* @return A batch of CacheDirectiveEntry objects.
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
BatchedEntries<CacheDirectiveEntry> listCacheDirectives(
|
BatchedEntries<CacheDirectiveEntry> listCacheDirectives(
|
||||||
long prevId, CacheDirectiveInfo filter) throws IOException;
|
long prevId, CacheDirectiveInfo filter) throws IOException;
|
||||||
|
|
||||||
|
@ -1398,7 +1398,7 @@ public interface ClientProtocol {
|
||||||
* @return A batch of CachePoolEntry objects.
|
* @return A batch of CachePoolEntry objects.
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
BatchedEntries<CachePoolEntry> listCachePools(String prevPool)
|
BatchedEntries<CachePoolEntry> listCachePools(String prevPool)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
@ -1445,7 +1445,7 @@ public interface ClientProtocol {
|
||||||
* Gets the ACLs of files and directories.
|
* Gets the ACLs of files and directories.
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
AclStatus getAclStatus(String src) throws IOException;
|
AclStatus getAclStatus(String src) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1459,7 +1459,7 @@ public interface ClientProtocol {
|
||||||
* Get the encryption zone for a path.
|
* Get the encryption zone for a path.
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
EncryptionZone getEZForPath(String src)
|
EncryptionZone getEZForPath(String src)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
@ -1471,7 +1471,7 @@ public interface ClientProtocol {
|
||||||
* @return Batch of encryption zones.
|
* @return Batch of encryption zones.
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
BatchedEntries<EncryptionZone> listEncryptionZones(
|
BatchedEntries<EncryptionZone> listEncryptionZones(
|
||||||
long prevId) throws IOException;
|
long prevId) throws IOException;
|
||||||
|
|
||||||
|
@ -1496,7 +1496,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(long prevId)
|
BatchedEntries<ZoneReencryptionStatus> listReencryptionStatus(long prevId)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
@ -1530,7 +1530,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
List<XAttr> getXAttrs(String src, List<XAttr> xAttrs)
|
List<XAttr> getXAttrs(String src, List<XAttr> xAttrs)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
@ -1546,7 +1546,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
List<XAttr> listXAttrs(String src)
|
List<XAttr> listXAttrs(String src)
|
||||||
throws IOException;
|
throws IOException;
|
||||||
|
|
||||||
|
@ -1581,7 +1581,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException see specific implementation
|
* @throws IOException see specific implementation
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly // TODO : after HDFS-13749 is done, change to coordinated call
|
||||||
void checkAccess(String path, FsAction mode) throws IOException;
|
void checkAccess(String path, FsAction mode) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1590,7 +1590,7 @@ public interface ClientProtocol {
|
||||||
* the starting point for the inotify event stream.
|
* the starting point for the inotify event stream.
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
long getCurrentEditLogTxid() throws IOException;
|
long getCurrentEditLogTxid() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1598,7 +1598,7 @@ public interface ClientProtocol {
|
||||||
* transactions for txids equal to or greater than txid.
|
* transactions for txids equal to or greater than txid.
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
EventBatchList getEditsFromTxid(long txid) throws IOException;
|
EventBatchList getEditsFromTxid(long txid) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1656,7 +1656,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
ErasureCodingPolicyInfo[] getErasureCodingPolicies() throws IOException;
|
ErasureCodingPolicyInfo[] getErasureCodingPolicies() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1665,7 +1665,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
Map<String, String> getErasureCodingCodecs() throws IOException;
|
Map<String, String> getErasureCodingCodecs() throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1676,7 +1676,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
ErasureCodingPolicy getErasureCodingPolicy(String src) throws IOException;
|
ErasureCodingPolicy getErasureCodingPolicy(String src) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1716,7 +1716,7 @@ public interface ClientProtocol {
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@Deprecated
|
@Deprecated
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) throws IOException;
|
BatchedEntries<OpenFileEntry> listOpenFiles(long prevId) throws IOException;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1731,7 +1731,7 @@ public interface ClientProtocol {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
|
BatchedEntries<OpenFileEntry> listOpenFiles(long prevId,
|
||||||
EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException;
|
EnumSet<OpenFilesType> openFilesTypes, String path) throws IOException;
|
||||||
|
|
||||||
|
@ -1743,6 +1743,6 @@ public interface ClientProtocol {
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
*/
|
*/
|
||||||
@Idempotent
|
@Idempotent
|
||||||
@ReadOnly
|
@ReadOnly(isCoordinated = true)
|
||||||
void msync() throws IOException;
|
void msync() throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -44,4 +44,11 @@ public @interface ReadOnly {
|
||||||
* is only available on the active namenode.
|
* is only available on the active namenode.
|
||||||
*/
|
*/
|
||||||
boolean activeOnly() default false;
|
boolean activeOnly() default false;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @return if true, when processing the rpc call of the target method, the
|
||||||
|
* server side will wait if server state id is behind client (msync). If
|
||||||
|
* false, the method will be processed regardless of server side state.
|
||||||
|
*/
|
||||||
|
boolean isCoordinated() default false;
|
||||||
}
|
}
|
||||||
|
|
|
@ -18,9 +18,13 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hdfs.server.namenode;
|
package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
|
import java.lang.reflect.Method;
|
||||||
|
import java.util.HashSet;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
|
||||||
|
import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly;
|
||||||
import org.apache.hadoop.ipc.AlignmentContext;
|
import org.apache.hadoop.ipc.AlignmentContext;
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
||||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||||
|
@ -34,12 +38,23 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||||
class GlobalStateIdContext implements AlignmentContext {
|
class GlobalStateIdContext implements AlignmentContext {
|
||||||
private final FSNamesystem namesystem;
|
private final FSNamesystem namesystem;
|
||||||
|
|
||||||
|
private final HashSet<String> coordinatedMethods;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Server side constructor.
|
* Server side constructor.
|
||||||
* @param namesystem server side state provider
|
* @param namesystem server side state provider
|
||||||
*/
|
*/
|
||||||
GlobalStateIdContext(FSNamesystem namesystem) {
|
GlobalStateIdContext(FSNamesystem namesystem) {
|
||||||
this.namesystem = namesystem;
|
this.namesystem = namesystem;
|
||||||
|
this.coordinatedMethods = new HashSet<>();
|
||||||
|
// For now, only ClientProtocol methods can be coordinated, so only checking
|
||||||
|
// against ClientProtocol.
|
||||||
|
for (Method method : ClientProtocol.class.getDeclaredMethods()) {
|
||||||
|
if (method.isAnnotationPresent(ReadOnly.class) &&
|
||||||
|
method.getAnnotationsByType(ReadOnly.class)[0].isCoordinated()) {
|
||||||
|
coordinatedMethods.add(method.getName());
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -92,4 +107,10 @@ class GlobalStateIdContext implements AlignmentContext {
|
||||||
public long getLastSeenStateId() {
|
public long getLastSeenStateId() {
|
||||||
return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
|
return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean isCoordinatedCall(String protocolName, String methodName) {
|
||||||
|
return protocolName.equals(ClientProtocol.class.getCanonicalName())
|
||||||
|
&& coordinatedMethods.contains(methodName);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -984,7 +984,7 @@ public class TestQuorumJournalManager {
|
||||||
List<EditLogInputStream> streams = new ArrayList<>();
|
List<EditLogInputStream> streams = new ArrayList<>();
|
||||||
qjm.selectInputStreams(streams, 1, true, true);
|
qjm.selectInputStreams(streams, 1, true, true);
|
||||||
verifyEdits(streams, 1, 5);
|
verifyEdits(streams, 1, 5);
|
||||||
IOUtils.closeStreams(streams.toArray(new Closeable[0]));
|
closeStreams(streams.toArray(new Closeable[0]));
|
||||||
for (AsyncLogger logger : spies) {
|
for (AsyncLogger logger : spies) {
|
||||||
Mockito.verify(logger, Mockito.times(1)).getJournaledEdits(1,
|
Mockito.verify(logger, Mockito.times(1)).getJournaledEdits(1,
|
||||||
QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
|
QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
|
||||||
|
@ -1000,7 +1000,7 @@ public class TestQuorumJournalManager {
|
||||||
List<EditLogInputStream> streams = new ArrayList<>();
|
List<EditLogInputStream> streams = new ArrayList<>();
|
||||||
qjm.selectInputStreams(streams, 1, true, false);
|
qjm.selectInputStreams(streams, 1, true, false);
|
||||||
verifyEdits(streams, 1, 5);
|
verifyEdits(streams, 1, 5);
|
||||||
IOUtils.closeStreams(streams.toArray(new Closeable[0]));
|
closeStreams(streams.toArray(new Closeable[0]));
|
||||||
for (AsyncLogger logger : spies) {
|
for (AsyncLogger logger : spies) {
|
||||||
Mockito.verify(logger, Mockito.times(1)).getJournaledEdits(1,
|
Mockito.verify(logger, Mockito.times(1)).getJournaledEdits(1,
|
||||||
QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
|
QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT);
|
||||||
|
@ -1018,7 +1018,7 @@ public class TestQuorumJournalManager {
|
||||||
List<EditLogInputStream> streams = new ArrayList<>();
|
List<EditLogInputStream> streams = new ArrayList<>();
|
||||||
qjm.selectInputStreams(streams, 1, true, false);
|
qjm.selectInputStreams(streams, 1, true, false);
|
||||||
verifyEdits(streams, 1, 10);
|
verifyEdits(streams, 1, 10);
|
||||||
IOUtils.closeStreams(streams.toArray(new Closeable[0]));
|
closeStreams(streams.toArray(new Closeable[0]));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -1060,7 +1060,7 @@ public class TestQuorumJournalManager {
|
||||||
// This should still succeed as the QJM should fall back to the streaming
|
// This should still succeed as the QJM should fall back to the streaming
|
||||||
// mechanism for fetching edits
|
// mechanism for fetching edits
|
||||||
verifyEdits(streams, 1, 11);
|
verifyEdits(streams, 1, 11);
|
||||||
IOUtils.closeStreams(streams.toArray(new Closeable[0]));
|
closeStreams(streams.toArray(new Closeable[0]));
|
||||||
|
|
||||||
for (AsyncLogger logger : spies) {
|
for (AsyncLogger logger : spies) {
|
||||||
Mockito.verify(logger, Mockito.times(1)).getEditLogManifest(1, true);
|
Mockito.verify(logger, Mockito.times(1)).getEditLogManifest(1, true);
|
||||||
|
@ -1087,7 +1087,7 @@ public class TestQuorumJournalManager {
|
||||||
// This should still succeed as the QJM should fall back to the streaming
|
// This should still succeed as the QJM should fall back to the streaming
|
||||||
// mechanism for fetching edits
|
// mechanism for fetching edits
|
||||||
verifyEdits(streams, 1, 10);
|
verifyEdits(streams, 1, 10);
|
||||||
IOUtils.closeStreams(streams.toArray(new Closeable[0]));
|
closeStreams(streams.toArray(new Closeable[0]));
|
||||||
|
|
||||||
for (AsyncLogger logger : spies) {
|
for (AsyncLogger logger : spies) {
|
||||||
Mockito.verify(logger, Mockito.times(1)).getEditLogManifest(1, true);
|
Mockito.verify(logger, Mockito.times(1)).getEditLogManifest(1, true);
|
||||||
|
@ -1148,4 +1148,13 @@ public class TestQuorumJournalManager {
|
||||||
segmentTxId);
|
segmentTxId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void closeStreams(java.io.Closeable... closeables)
|
||||||
|
throws IOException {
|
||||||
|
for (java.io.Closeable c : closeables) {
|
||||||
|
if (c != null) {
|
||||||
|
c.close();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.fs.permission.FsPermission;
|
import org.apache.hadoop.fs.permission.FsPermission;
|
||||||
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
import org.apache.hadoop.hdfs.DistributedFileSystem;
|
||||||
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
import org.apache.hadoop.hdfs.MiniDFSCluster;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
|
||||||
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
import org.apache.hadoop.hdfs.server.namenode.NameNode;
|
||||||
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
import org.apache.hadoop.io.retry.FailoverProxyProvider;
|
||||||
|
@ -342,6 +343,57 @@ public class TestObserverNode {
|
||||||
assertEquals(1, readStatus.get());
|
assertEquals(1, readStatus.get());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUncoordinatedCall() throws Exception {
|
||||||
|
// disable fast tailing so that coordination takes time.
|
||||||
|
conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false);
|
||||||
|
conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS);
|
||||||
|
conf.setTimeDuration(
|
||||||
|
DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS);
|
||||||
|
setUpCluster(1);
|
||||||
|
setObserverRead(true);
|
||||||
|
|
||||||
|
// make a write call so that client will be ahead of
|
||||||
|
// observer for now.
|
||||||
|
dfs.mkdir(testPath, FsPermission.getDefault());
|
||||||
|
|
||||||
|
// a status flag, initialized to 0, after reader finished, this will be
|
||||||
|
// updated to 1, -1 on error
|
||||||
|
AtomicInteger readStatus = new AtomicInteger(0);
|
||||||
|
|
||||||
|
// create a separate thread to make a blocking read.
|
||||||
|
Thread reader = new Thread(() -> {
|
||||||
|
try {
|
||||||
|
// this read call will block until server state catches up. But due to
|
||||||
|
// configuration, this will take a very long time.
|
||||||
|
dfs.getClient().getFileInfo("/");
|
||||||
|
readStatus.set(1);
|
||||||
|
fail("Should have been interrupted before getting here.");
|
||||||
|
} catch (IOException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
readStatus.set(-1);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
reader.start();
|
||||||
|
|
||||||
|
long before = System.currentTimeMillis();
|
||||||
|
dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL);
|
||||||
|
long after = System.currentTimeMillis();
|
||||||
|
|
||||||
|
// should succeed immediately, because datanodeReport is marked an
|
||||||
|
// uncoordinated call, and will not be waiting for server to catch up.
|
||||||
|
assertTrue(after - before < 200);
|
||||||
|
// by this time, reader thread should still be blocking, so the status not
|
||||||
|
// updated
|
||||||
|
assertEquals(0, readStatus.get());
|
||||||
|
Thread.sleep(5000);
|
||||||
|
// reader thread status should still be unchanged after 5 sec...
|
||||||
|
assertEquals(0, readStatus.get());
|
||||||
|
// and the reader thread is not dead, so it must be still waiting
|
||||||
|
assertEquals(Thread.State.WAITING, reader.getState());
|
||||||
|
reader.interrupt();
|
||||||
|
}
|
||||||
|
|
||||||
private void setUpCluster(int numObservers) throws Exception {
|
private void setUpCluster(int numObservers) throws Exception {
|
||||||
qjmhaCluster = new MiniQJMHACluster.Builder(conf)
|
qjmhaCluster = new MiniQJMHACluster.Builder(conf)
|
||||||
.setNumNameNodes(2 + numObservers)
|
.setNumNameNodes(2 + numObservers)
|
||||||
|
|
Loading…
Reference in New Issue