diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java index 0e8b960ecd3..a435ff6c4e6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/AlignmentContext.java @@ -38,6 +38,7 @@ public interface AlignmentContext { /** * This is the intended server method call to implement to pass state info * during RPC response header construction. + * * @param header The RPC response header builder. */ void updateResponseState(RpcResponseHeaderProto.Builder header); @@ -45,6 +46,7 @@ public interface AlignmentContext { /** * This is the intended client method call to implement to recieve state info * during RPC response processing. + * * @param header The RPC response header. */ void receiveResponseState(RpcResponseHeaderProto header); @@ -52,6 +54,7 @@ public interface AlignmentContext { /** * This is the intended client method call to pull last seen state info * into RPC request processing. + * * @param header The RPC request header builder. */ void updateRequestState(RpcRequestHeaderProto.Builder header); @@ -59,6 +62,7 @@ public interface AlignmentContext { /** * This is the intended server method call to implement to receive * client state info during RPC response header processing. + * * @param header The RPC request header. * @return state id of in the request header. */ @@ -66,7 +70,19 @@ public interface AlignmentContext { /** * Returns the last seen state id of the alignment context instance. + * * @return the value of the last seen state id. */ long getLastSeenStateId(); + + /** + * Return true if this method call does need to be synced, false + * otherwise. sync meaning server state needs to have caught up with + * client state. + * + * @param protocolName the name of the protocol + * @param method the method call to check + * @return true if this method is async, false otherwise. + */ + boolean isCoordinatedCall(String protocolName, String method); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index bf0d68d7aa9..1cbf8b8fea7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -707,6 +707,7 @@ public abstract class Server { private int priorityLevel; // the priority level assigned by scheduler, 0 by default private long clientStateId; + private boolean isCallCoordinated; Call() { this(RpcConstants.INVALID_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, @@ -738,6 +739,7 @@ public abstract class Server { this.traceScope = traceScope; this.callerContext = callerContext; this.clientStateId = Long.MIN_VALUE; + this.isCallCoordinated = false; } @Override @@ -823,6 +825,14 @@ public abstract class Server { this.clientStateId = stateId; } + public void markCallCoordinated(boolean flag) { + this.isCallCoordinated = flag; + } + + public boolean isCallCoordinated() { + return this.isCallCoordinated; + } + @InterfaceStability.Unstable public void deferResponse() { this.deferredResponse = true; @@ -2521,9 +2531,31 @@ public abstract class Server { // Save the priority level assignment by the scheduler call.setPriorityLevel(callQueue.getPriorityLevel(call)); - if(alignmentContext != null) { - long stateId = alignmentContext.receiveRequestState(header); - call.setClientStateId(stateId); + if(alignmentContext != null && call.rpcRequest != null && + (call.rpcRequest instanceof ProtobufRpcEngine.RpcProtobufRequest)) { + // if call.rpcRequest is not RpcProtobufRequest, will skip the following + // step and treat the call as uncoordinated. As currently only certain + // ClientProtocol methods request made through RPC protobuf needs to be + // coordinated. + String methodName; + String protoName; + try { + ProtobufRpcEngine.RpcProtobufRequest req = + (ProtobufRpcEngine.RpcProtobufRequest) call.rpcRequest; + methodName = req.getRequestHeader().getMethodName(); + protoName = req.getRequestHeader().getDeclaringClassProtocolName(); + } catch (IOException ioe) { + throw new RpcServerException("Rpc request header check fail", ioe); + } + if (!alignmentContext.isCoordinatedCall(protoName, methodName)) { + call.markCallCoordinated(false); + } else { + call.markCallCoordinated(true); + long stateId = alignmentContext.receiveRequestState(header); + call.setClientStateId(stateId); + } + } else { + call.markCallCoordinated(false); } try { @@ -2706,8 +2738,8 @@ public abstract class Server { TraceScope traceScope = null; try { final Call call = callQueue.take(); // pop the queue; maybe blocked here - if (alignmentContext != null && call.getClientStateId() > - alignmentContext.getLastSeenStateId()) { + if (alignmentContext != null && call.isCallCoordinated() && + call.getClientStateId() > alignmentContext.getLastSeenStateId()) { /* * The call processing should be postponed until the client call's * state id is aligned (>=) with the server state id. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java index 10fa0e15e4a..6d366a63c34 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/ClientGSIContext.java @@ -44,6 +44,12 @@ public class ClientGSIContext implements AlignmentContext { return lastSeenStateId.get(); } + @Override + public boolean isCoordinatedCall(String protocolName, String method) { + throw new UnsupportedOperationException( + "Client should not be checking uncoordinated call"); + } + /** * Client side implementation only receives state alignment info. * It does not provide state alignment info therefore this does nothing. diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java index 6196500156c..bb7092cd21a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/protocol/ClientProtocol.java @@ -129,7 +129,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly(atimeAffected = true) + @ReadOnly(atimeAffected = true, isCoordinated = true) LocatedBlocks getBlockLocations(String src, long offset, long length) throws IOException; @@ -139,7 +139,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) FsServerDefaults getServerDefaults() throws IOException; /** @@ -280,7 +280,7 @@ public interface ClientProtocol { * @return All the in-use block storage policies currently. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BlockStoragePolicy[] getStoragePolicies() throws IOException; /** @@ -323,7 +323,7 @@ public interface ClientProtocol { * If file/dir src is not found */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BlockStoragePolicy getStoragePolicy(String path) throws IOException; /** @@ -690,7 +690,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) DirectoryListing getListing(String src, byte[] startAfter, boolean needLocation) throws IOException; @@ -702,7 +702,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) SnapshottableDirectoryStatus[] getSnapshottableDirListing() throws IOException; @@ -830,7 +830,7 @@ public interface ClientProtocol { * a symlink. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) long getPreferredBlockSize(String filename) throws IOException; @@ -975,7 +975,7 @@ public interface ClientProtocol { * cookie returned from the previous call. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) CorruptFileBlocks listCorruptFileBlocks(String path, String cookie) throws IOException; @@ -1011,7 +1011,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) HdfsFileStatus getFileInfo(String src) throws IOException; /** @@ -1026,7 +1026,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) boolean isFileClosed(String src) throws IOException; /** @@ -1043,7 +1043,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) HdfsFileStatus getFileLinkInfo(String src) throws IOException; /** @@ -1057,7 +1057,7 @@ public interface ClientProtocol { * @throws IOException If an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) ContentSummary getContentSummary(String path) throws IOException; /** @@ -1170,7 +1170,7 @@ public interface ClientProtocol { * or an I/O error occurred */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) String getLinkTarget(String path) throws IOException; /** @@ -1241,7 +1241,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) DataEncryptionKey getDataEncryptionKey() throws IOException; /** @@ -1310,7 +1310,7 @@ public interface ClientProtocol { * @throws IOException on error */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) SnapshotDiffReport getSnapshotDiffReport(String snapshotRoot, String fromSnapshot, String toSnapshot) throws IOException; @@ -1356,7 +1356,7 @@ public interface ClientProtocol { * @return A batch of CacheDirectiveEntry objects. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listCacheDirectives( long prevId, CacheDirectiveInfo filter) throws IOException; @@ -1398,7 +1398,7 @@ public interface ClientProtocol { * @return A batch of CachePoolEntry objects. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listCachePools(String prevPool) throws IOException; @@ -1445,7 +1445,7 @@ public interface ClientProtocol { * Gets the ACLs of files and directories. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) AclStatus getAclStatus(String src) throws IOException; /** @@ -1459,7 +1459,7 @@ public interface ClientProtocol { * Get the encryption zone for a path. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) EncryptionZone getEZForPath(String src) throws IOException; @@ -1471,7 +1471,7 @@ public interface ClientProtocol { * @return Batch of encryption zones. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listEncryptionZones( long prevId) throws IOException; @@ -1496,7 +1496,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listReencryptionStatus(long prevId) throws IOException; @@ -1530,7 +1530,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) List getXAttrs(String src, List xAttrs) throws IOException; @@ -1546,7 +1546,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) List listXAttrs(String src) throws IOException; @@ -1581,7 +1581,7 @@ public interface ClientProtocol { * @throws IOException see specific implementation */ @Idempotent - @ReadOnly + @ReadOnly // TODO : after HDFS-13749 is done, change to coordinated call void checkAccess(String path, FsAction mode) throws IOException; /** @@ -1590,7 +1590,7 @@ public interface ClientProtocol { * the starting point for the inotify event stream. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) long getCurrentEditLogTxid() throws IOException; /** @@ -1598,7 +1598,7 @@ public interface ClientProtocol { * transactions for txids equal to or greater than txid. */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) EventBatchList getEditsFromTxid(long txid) throws IOException; /** @@ -1656,7 +1656,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) ErasureCodingPolicyInfo[] getErasureCodingPolicies() throws IOException; /** @@ -1665,7 +1665,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) Map getErasureCodingCodecs() throws IOException; /** @@ -1676,7 +1676,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) ErasureCodingPolicy getErasureCodingPolicy(String src) throws IOException; /** @@ -1716,7 +1716,7 @@ public interface ClientProtocol { */ @Idempotent @Deprecated - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listOpenFiles(long prevId) throws IOException; /** @@ -1731,7 +1731,7 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) BatchedEntries listOpenFiles(long prevId, EnumSet openFilesTypes, String path) throws IOException; @@ -1743,6 +1743,6 @@ public interface ClientProtocol { * @throws IOException */ @Idempotent - @ReadOnly + @ReadOnly(isCoordinated = true) void msync() throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java index 1782dcb6d84..1786ce1aef7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/ReadOnly.java @@ -44,4 +44,11 @@ public @interface ReadOnly { * is only available on the active namenode. */ boolean activeOnly() default false; + + /** + * @return if true, when processing the rpc call of the target method, the + * server side will wait if server state id is behind client (msync). If + * false, the method will be processed regardless of server side state. + */ + boolean isCoordinated() default false; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java index 00166929338..ecb9fd36247 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/GlobalStateIdContext.java @@ -18,9 +18,13 @@ package org.apache.hadoop.hdfs.server.namenode; +import java.lang.reflect.Method; +import java.util.HashSet; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.ha.HAServiceProtocol; +import org.apache.hadoop.hdfs.protocol.ClientProtocol; +import org.apache.hadoop.hdfs.server.namenode.ha.ReadOnly; import org.apache.hadoop.ipc.AlignmentContext; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; @@ -34,12 +38,23 @@ import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; class GlobalStateIdContext implements AlignmentContext { private final FSNamesystem namesystem; + private final HashSet coordinatedMethods; + /** * Server side constructor. * @param namesystem server side state provider */ GlobalStateIdContext(FSNamesystem namesystem) { this.namesystem = namesystem; + this.coordinatedMethods = new HashSet<>(); + // For now, only ClientProtocol methods can be coordinated, so only checking + // against ClientProtocol. + for (Method method : ClientProtocol.class.getDeclaredMethods()) { + if (method.isAnnotationPresent(ReadOnly.class) && + method.getAnnotationsByType(ReadOnly.class)[0].isCoordinated()) { + coordinatedMethods.add(method.getName()); + } + } } /** @@ -92,4 +107,10 @@ class GlobalStateIdContext implements AlignmentContext { public long getLastSeenStateId() { return namesystem.getFSImage().getCorrectLastAppliedOrWrittenTxId(); } + + @Override + public boolean isCoordinatedCall(String protocolName, String methodName) { + return protocolName.equals(ClientProtocol.class.getCanonicalName()) + && coordinatedMethods.contains(methodName); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index 61254c789c5..af935fd040a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -984,7 +984,7 @@ public class TestQuorumJournalManager { List streams = new ArrayList<>(); qjm.selectInputStreams(streams, 1, true, true); verifyEdits(streams, 1, 5); - IOUtils.closeStreams(streams.toArray(new Closeable[0])); + closeStreams(streams.toArray(new Closeable[0])); for (AsyncLogger logger : spies) { Mockito.verify(logger, Mockito.times(1)).getJournaledEdits(1, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); @@ -1000,7 +1000,7 @@ public class TestQuorumJournalManager { List streams = new ArrayList<>(); qjm.selectInputStreams(streams, 1, true, false); verifyEdits(streams, 1, 5); - IOUtils.closeStreams(streams.toArray(new Closeable[0])); + closeStreams(streams.toArray(new Closeable[0])); for (AsyncLogger logger : spies) { Mockito.verify(logger, Mockito.times(1)).getJournaledEdits(1, QuorumJournalManager.QJM_RPC_MAX_TXNS_DEFAULT); @@ -1018,7 +1018,7 @@ public class TestQuorumJournalManager { List streams = new ArrayList<>(); qjm.selectInputStreams(streams, 1, true, false); verifyEdits(streams, 1, 10); - IOUtils.closeStreams(streams.toArray(new Closeable[0])); + closeStreams(streams.toArray(new Closeable[0])); } @Test @@ -1060,7 +1060,7 @@ public class TestQuorumJournalManager { // This should still succeed as the QJM should fall back to the streaming // mechanism for fetching edits verifyEdits(streams, 1, 11); - IOUtils.closeStreams(streams.toArray(new Closeable[0])); + closeStreams(streams.toArray(new Closeable[0])); for (AsyncLogger logger : spies) { Mockito.verify(logger, Mockito.times(1)).getEditLogManifest(1, true); @@ -1087,7 +1087,7 @@ public class TestQuorumJournalManager { // This should still succeed as the QJM should fall back to the streaming // mechanism for fetching edits verifyEdits(streams, 1, 10); - IOUtils.closeStreams(streams.toArray(new Closeable[0])); + closeStreams(streams.toArray(new Closeable[0])); for (AsyncLogger logger : spies) { Mockito.verify(logger, Mockito.times(1)).getEditLogManifest(1, true); @@ -1148,4 +1148,13 @@ public class TestQuorumJournalManager { segmentTxId); } } + + private void closeStreams(java.io.Closeable... closeables) + throws IOException { + for (java.io.Closeable c : closeables) { + if (c != null) { + c.close(); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java index 16371b10308..89bfffb4494 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestObserverNode.java @@ -23,6 +23,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.qjournal.MiniQJMHACluster; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.io.retry.FailoverProxyProvider; @@ -342,6 +343,57 @@ public class TestObserverNode { assertEquals(1, readStatus.get()); } + @Test + public void testUncoordinatedCall() throws Exception { + // disable fast tailing so that coordination takes time. + conf.setBoolean(DFS_HA_TAILEDITS_INPROGRESS_KEY, false); + conf.setTimeDuration(DFS_HA_LOGROLL_PERIOD_KEY, 300, TimeUnit.SECONDS); + conf.setTimeDuration( + DFS_HA_TAILEDITS_PERIOD_KEY, 200, TimeUnit.SECONDS); + setUpCluster(1); + setObserverRead(true); + + // make a write call so that client will be ahead of + // observer for now. + dfs.mkdir(testPath, FsPermission.getDefault()); + + // a status flag, initialized to 0, after reader finished, this will be + // updated to 1, -1 on error + AtomicInteger readStatus = new AtomicInteger(0); + + // create a separate thread to make a blocking read. + Thread reader = new Thread(() -> { + try { + // this read call will block until server state catches up. But due to + // configuration, this will take a very long time. + dfs.getClient().getFileInfo("/"); + readStatus.set(1); + fail("Should have been interrupted before getting here."); + } catch (IOException e) { + e.printStackTrace(); + readStatus.set(-1); + } + }); + reader.start(); + + long before = System.currentTimeMillis(); + dfs.getClient().datanodeReport(HdfsConstants.DatanodeReportType.ALL); + long after = System.currentTimeMillis(); + + // should succeed immediately, because datanodeReport is marked an + // uncoordinated call, and will not be waiting for server to catch up. + assertTrue(after - before < 200); + // by this time, reader thread should still be blocking, so the status not + // updated + assertEquals(0, readStatus.get()); + Thread.sleep(5000); + // reader thread status should still be unchanged after 5 sec... + assertEquals(0, readStatus.get()); + // and the reader thread is not dead, so it must be still waiting + assertEquals(Thread.State.WAITING, reader.getState()); + reader.interrupt(); + } + private void setUpCluster(int numObservers) throws Exception { qjmhaCluster = new MiniQJMHACluster.Builder(conf) .setNumNameNodes(2 + numObservers)