From 4d19e39fd7b800cb081c8042cd2bf6c895fb3fef Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Fri, 13 Oct 2017 14:22:21 -0700 Subject: [PATCH] HDFS-12553. Add nameServiceId to QJournalProtocol. Contributed by Bharat Viswanadham --- .../hdfs/qjournal/client/AsyncLogger.java | 2 +- .../qjournal/client/IPCLoggerChannel.java | 43 +++-- .../qjournal/client/QuorumJournalManager.java | 37 +++- .../qjournal/protocol/QJournalProtocol.java | 40 ++-- .../hdfs/qjournal/protocol/RequestInfo.java | 11 +- ...JournalProtocolServerSideTranslatorPB.java | 27 ++- .../QJournalProtocolTranslatorPB.java | 178 +++++++++++------- .../hadoop/hdfs/qjournal/server/Journal.java | 15 +- .../hdfs/qjournal/server/JournalNode.java | 74 ++++++-- .../qjournal/server/JournalNodeRpcServer.java | 71 ++++--- .../qjournal/server/JournalNodeSyncer.java | 26 ++- .../hdfs/server/namenode/FSEditLog.java | 14 +- .../src/main/proto/QJournalProtocol.proto | 12 ++ .../qjournal/client/TestEpochsAreUnique.java | 4 +- .../qjournal/client/TestQJMWithFaults.java | 12 +- .../client/TestQuorumJournalManager.java | 5 +- .../hdfs/qjournal/server/TestJournal.java | 8 +- .../hdfs/qjournal/server/TestJournalNode.java | 97 ++++++++++ .../qjournal/server/TestJournalNodeSync.java | 7 + 19 files changed, 505 insertions(+), 178 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java index 8504e8008a3..d2b48ccec53 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java @@ -49,7 +49,7 @@ interface AsyncLogger { interface Factory { AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, - String journalId, InetSocketAddress addr); + String journalId, String nameServiceId, InetSocketAddress addr); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java index 6cd892c7ab4..30367357b82 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java @@ -95,6 +95,8 @@ public class IPCLoggerChannel implements AsyncLogger { private long committedTxId = HdfsServerConstants.INVALID_TXID; private final String journalId; + private final String nameServiceId; + private final NamespaceInfo nsInfo; private URL httpServerURL; @@ -152,8 +154,8 @@ public class IPCLoggerChannel implements AsyncLogger { static final Factory FACTORY = new AsyncLogger.Factory() { @Override public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, - String journalId, InetSocketAddress addr) { - return new IPCLoggerChannel(conf, nsInfo, journalId, addr); + String journalId, String nameServiceId, InetSocketAddress addr) { + return new IPCLoggerChannel(conf, nsInfo, journalId, nameServiceId, addr); } }; @@ -161,11 +163,19 @@ public class IPCLoggerChannel implements AsyncLogger { NamespaceInfo nsInfo, String journalId, InetSocketAddress addr) { + this(conf, nsInfo, journalId, null, addr); + } + + public IPCLoggerChannel(Configuration conf, + NamespaceInfo nsInfo, + String journalId, + String nameServiceId, + InetSocketAddress addr) { this.conf = conf; this.nsInfo = nsInfo; this.journalId = journalId; + this.nameServiceId = nameServiceId; this.addr = addr; - this.queueSizeLimitBytes = 1024 * 1024 * conf.getInt( DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT); @@ -286,7 +296,8 @@ public class IPCLoggerChannel implements AsyncLogger { private synchronized RequestInfo createReqInfo() { Preconditions.checkState(epoch > 0, "bad epoch: " + epoch); - return new RequestInfo(journalId, epoch, ipcSerial++, committedTxId); + return new RequestInfo(journalId, nameServiceId, + epoch, ipcSerial++, committedTxId); } @VisibleForTesting @@ -330,7 +341,7 @@ public class IPCLoggerChannel implements AsyncLogger { return singleThreadExecutor.submit(new Callable() { @Override public Boolean call() throws IOException { - return getProxy().isFormatted(journalId); + return getProxy().isFormatted(journalId, nameServiceId); } }); } @@ -341,7 +352,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public GetJournalStateResponseProto call() throws IOException { GetJournalStateResponseProto ret = - getProxy().getJournalState(journalId); + getProxy().getJournalState(journalId, nameServiceId); constructHttpServerURI(ret); return ret; } @@ -354,7 +365,7 @@ public class IPCLoggerChannel implements AsyncLogger { return singleThreadExecutor.submit(new Callable() { @Override public NewEpochResponseProto call() throws IOException { - return getProxy().newEpoch(journalId, nsInfo, epoch); + return getProxy().newEpoch(journalId, nameServiceId, nsInfo, epoch); } }); } @@ -495,7 +506,7 @@ public class IPCLoggerChannel implements AsyncLogger { return singleThreadExecutor.submit(new Callable() { @Override public Void call() throws Exception { - getProxy().format(journalId, nsInfo); + getProxy().format(journalId, nameServiceId, nsInfo); return null; } }); @@ -554,7 +565,7 @@ public class IPCLoggerChannel implements AsyncLogger { @Override public RemoteEditLogManifest call() throws IOException { GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest( - journalId, fromTxnId, inProgressOk); + journalId, nameServiceId, fromTxnId, inProgressOk); // Update the http port, since we need this to build URLs to any of the // returned logs. constructHttpServerURI(ret); @@ -573,7 +584,7 @@ public class IPCLoggerChannel implements AsyncLogger { // force an RPC call so we know what the HTTP port should be if it // haven't done so. GetJournalStateResponseProto ret = getProxy().getJournalState( - journalId); + journalId, nameServiceId); constructHttpServerURI(ret); } return getProxy().prepareRecovery(createReqInfo(), segmentTxId); @@ -620,7 +631,7 @@ public class IPCLoggerChannel implements AsyncLogger { return singleThreadExecutor.submit(new Callable() { @Override public Void call() throws IOException { - getProxy().doFinalize(journalId); + getProxy().doFinalize(journalId, nameServiceId); return null; } }); @@ -632,8 +643,8 @@ public class IPCLoggerChannel implements AsyncLogger { return singleThreadExecutor.submit(new Callable() { @Override public Boolean call() throws IOException { - return getProxy().canRollBack(journalId, storage, prevStorage, - targetLayoutVersion); + return getProxy().canRollBack(journalId, nameServiceId, + storage, prevStorage, targetLayoutVersion); } }); } @@ -643,7 +654,7 @@ public class IPCLoggerChannel implements AsyncLogger { return singleThreadExecutor.submit(new Callable() { @Override public Void call() throws IOException { - getProxy().doRollback(journalId); + getProxy().doRollback(journalId, nameServiceId); return null; } }); @@ -654,7 +665,7 @@ public class IPCLoggerChannel implements AsyncLogger { return singleThreadExecutor.submit(new Callable() { @Override public Void call() throws IOException { - getProxy().discardSegments(journalId, startTxId); + getProxy().discardSegments(journalId, nameServiceId, startTxId); return null; } }); @@ -665,7 +676,7 @@ public class IPCLoggerChannel implements AsyncLogger { return singleThreadExecutor.submit(new Callable() { @Override public Long call() throws IOException { - return getProxy().getJournalCTime(journalId); + return getProxy().getJournalCTime(journalId, nameServiceId); } }); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index f66e2c0cdde..d30625b533d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -85,26 +85,44 @@ public class QuorumJournalManager implements JournalManager { private final Configuration conf; private final URI uri; private final NamespaceInfo nsInfo; + private final String nameServiceId; private boolean isActiveWriter; private final AsyncLoggerSet loggers; private int outputBufferCapacity = 512 * 1024; private final URLConnectionFactory connectionFactory; - + + @VisibleForTesting public QuorumJournalManager(Configuration conf, - URI uri, NamespaceInfo nsInfo) throws IOException { - this(conf, uri, nsInfo, IPCLoggerChannel.FACTORY); + URI uri, + NamespaceInfo nsInfo) throws IOException { + this(conf, uri, nsInfo, null, IPCLoggerChannel.FACTORY); } + public QuorumJournalManager(Configuration conf, + URI uri, NamespaceInfo nsInfo, String nameServiceId) throws IOException { + this(conf, uri, nsInfo, nameServiceId, IPCLoggerChannel.FACTORY); + } + + @VisibleForTesting QuorumJournalManager(Configuration conf, - URI uri, NamespaceInfo nsInfo, + URI uri, NamespaceInfo nsInfo, + AsyncLogger.Factory loggerFactory) throws IOException { + this(conf, uri, nsInfo, null, loggerFactory); + + } + + + QuorumJournalManager(Configuration conf, + URI uri, NamespaceInfo nsInfo, String nameServiceId, AsyncLogger.Factory loggerFactory) throws IOException { Preconditions.checkArgument(conf != null, "must be configured"); this.conf = conf; this.uri = uri; this.nsInfo = nsInfo; + this.nameServiceId = nameServiceId; this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory)); this.connectionFactory = URLConnectionFactory .newDefaultURLConnectionFactory(conf); @@ -142,7 +160,7 @@ public class QuorumJournalManager implements JournalManager { protected List createLoggers( AsyncLogger.Factory factory) throws IOException { - return createLoggers(conf, uri, nsInfo, factory); + return createLoggers(conf, uri, nsInfo, factory, nameServiceId); } static String parseJournalId(URI uri) { @@ -354,8 +372,11 @@ public class QuorumJournalManager implements JournalManager { } static List createLoggers(Configuration conf, - URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory) - throws IOException { + URI uri, + NamespaceInfo nsInfo, + AsyncLogger.Factory factory, + String nameServiceId) + throws IOException { List ret = Lists.newArrayList(); List addrs = Util.getAddressesList(uri); if (addrs.size() % 2 == 0) { @@ -364,7 +385,7 @@ public class QuorumJournalManager implements JournalManager { } String jid = parseJournalId(uri); for (InetSocketAddress addr : addrs) { - ret.add(factory.createLogger(conf, nsInfo, jid, addr)); + ret.add(factory.createLogger(conf, nsInfo, jid, nameServiceId, addr)); } return ret; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java index cb3920f5a82..5558bd54721 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java @@ -53,26 +53,30 @@ public interface QJournalProtocol { * @return true if the given journal has been formatted and * contains valid data. */ - public boolean isFormatted(String journalId) throws IOException; + boolean isFormatted(String journalId, + String nameServiceId) throws IOException; /** * Get the current state of the journal, including the most recent * epoch number and the HTTP port. */ - public GetJournalStateResponseProto getJournalState(String journalId) + GetJournalStateResponseProto getJournalState(String journalId, + String nameServiceId) throws IOException; /** * Format the underlying storage for the given namespace. */ - public void format(String journalId, + void format(String journalId, String nameServiceId, NamespaceInfo nsInfo) throws IOException; /** * Begin a new epoch. See the HDFS-3077 design doc for details. */ - public NewEpochResponseProto newEpoch(String journalId, - NamespaceInfo nsInfo, long epoch) throws IOException; + NewEpochResponseProto newEpoch(String journalId, + String nameServiceId, + NamespaceInfo nsInfo, + long epoch) throws IOException; /** * Journal edit records. @@ -130,8 +134,10 @@ public interface QJournalProtocol { * segment * @return a list of edit log segments since the given transaction ID. */ - public GetEditLogManifestResponseProto getEditLogManifest(String jid, - long sinceTxId, boolean inProgressOk) + GetEditLogManifestResponseProto getEditLogManifest(String jid, + String nameServiceId, + long sinceTxId, + boolean inProgressOk) throws IOException; /** @@ -147,24 +153,30 @@ public interface QJournalProtocol { public void acceptRecovery(RequestInfo reqInfo, SegmentStateProto stateToAccept, URL fromUrl) throws IOException; - public void doPreUpgrade(String journalId) throws IOException; + void doPreUpgrade(String journalId) throws IOException; public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException; - public void doFinalize(String journalId) throws IOException; + void doFinalize(String journalId, + String nameServiceid) throws IOException; - public Boolean canRollBack(String journalId, StorageInfo storage, - StorageInfo prevStorage, int targetLayoutVersion) throws IOException; + Boolean canRollBack(String journalId, String nameServiceid, + StorageInfo storage, StorageInfo prevStorage, + int targetLayoutVersion) throws IOException; - public void doRollback(String journalId) throws IOException; + void doRollback(String journalId, + String nameServiceid) throws IOException; /** * Discard journal segments whose first TxId is greater than or equal to the * given txid. */ @Idempotent - public void discardSegments(String journalId, long startTxId) + void discardSegments(String journalId, + String nameServiceId, + long startTxId) throws IOException; - public Long getJournalCTime(String journalId) throws IOException; + Long getJournalCTime(String journalId, + String nameServiceId) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java index 2569aadd463..01e641c194c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/RequestInfo.java @@ -26,15 +26,22 @@ public class RequestInfo { private long epoch; private long ipcSerialNumber; private final long committedTxId; + private final String nameServiceId; - public RequestInfo(String jid, long epoch, long ipcSerialNumber, - long committedTxId) { + public RequestInfo(String jid, String nameServiceId, + long epoch, long ipcSerialNumber, + long committedTxId) { this.jid = jid; + this.nameServiceId = nameServiceId; this.epoch = epoch; this.ipcSerialNumber = ipcSerialNumber; this.committedTxId = committedTxId; } + public String getNameServiceId() { + return nameServiceId; + } + public long getEpoch() { return epoch; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java index 28f77f26fe3..865d2969220 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java @@ -101,7 +101,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP IsFormattedRequestProto request) throws ServiceException { try { boolean ret = impl.isFormatted( - convert(request.getJid())); + convert(request.getJid()), + request.hasNameServiceId() ? request.getNameServiceId() : null); return IsFormattedResponseProto.newBuilder() .setIsFormatted(ret) .build(); @@ -116,7 +117,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP GetJournalStateRequestProto request) throws ServiceException { try { return impl.getJournalState( - convert(request.getJid())); + convert(request.getJid()), + request.hasNameServiceId() ? request.getNameServiceId() : null); } catch (IOException ioe) { throw new ServiceException(ioe); } @@ -132,6 +134,7 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP try { return impl.newEpoch( request.getJid().getIdentifier(), + request.hasNameServiceId() ? request.getNameServiceId() : null, PBHelper.convert(request.getNsInfo()), request.getEpoch()); } catch (IOException ioe) { @@ -143,6 +146,7 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP FormatRequestProto request) throws ServiceException { try { impl.format(request.getJid().getIdentifier(), + request.hasNameServiceId() ? request.getNameServiceId() : null, PBHelper.convert(request.getNsInfo())); return FormatResponseProto.getDefaultInstance(); } catch (IOException ioe) { @@ -223,6 +227,7 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP try { return impl.getEditLogManifest( request.getJid().getIdentifier(), + request.hasNameServiceId() ? request.getNameServiceId() : null, request.getSinceTxId(), request.getInProgressOk()); } catch (IOException e) { @@ -260,6 +265,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP QJournalProtocolProtos.RequestInfoProto reqInfo) { return new RequestInfo( reqInfo.getJournalId().getIdentifier(), + reqInfo.hasNameServiceId() ? + reqInfo.getNameServiceId() : null, reqInfo.getEpoch(), reqInfo.getIpcSerialNumber(), reqInfo.hasCommittedTxId() ? @@ -294,7 +301,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP public DoFinalizeResponseProto doFinalize(RpcController controller, DoFinalizeRequestProto request) throws ServiceException { try { - impl.doFinalize(convert(request.getJid())); + impl.doFinalize(convert(request.getJid()), + request.hasNameServiceId() ? request.getNameServiceId() : null); return DoFinalizeResponseProto.getDefaultInstance(); } catch (IOException e) { throw new ServiceException(e); @@ -306,7 +314,9 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP CanRollBackRequestProto request) throws ServiceException { try { StorageInfo si = PBHelper.convert(request.getStorage(), NodeType.JOURNAL_NODE); - Boolean result = impl.canRollBack(convert(request.getJid()), si, + Boolean result = impl.canRollBack(convert(request.getJid()), + request.hasNameServiceId() ? request.getNameServiceId() : null, + si, PBHelper.convert(request.getPrevStorage(), NodeType.JOURNAL_NODE), request.getTargetLayoutVersion()); return CanRollBackResponseProto.newBuilder() @@ -321,7 +331,7 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP public DoRollbackResponseProto doRollback(RpcController controller, DoRollbackRequestProto request) throws ServiceException { try { - impl.doRollback(convert(request.getJid())); + impl.doRollback(convert(request.getJid()), request.getNameserviceId()); return DoRollbackResponseProto.getDefaultInstance(); } catch (IOException e) { throw new ServiceException(e); @@ -333,7 +343,9 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP RpcController controller, DiscardSegmentsRequestProto request) throws ServiceException { try { - impl.discardSegments(convert(request.getJid()), request.getStartTxId()); + impl.discardSegments(convert(request.getJid()), + request.hasNameServiceId() ? request.getNameServiceId() : null, + request.getStartTxId()); return DiscardSegmentsResponseProto.getDefaultInstance(); } catch (IOException e) { throw new ServiceException(e); @@ -344,7 +356,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP public GetJournalCTimeResponseProto getJournalCTime(RpcController controller, GetJournalCTimeRequestProto request) throws ServiceException { try { - Long resultCTime = impl.getJournalCTime(convert(request.getJid())); + Long resultCTime = impl.getJournalCTime(convert(request.getJid()), + request.getNameServiceId()); return GetJournalCTimeResponseProto.newBuilder() .setResultCTime(resultCTime) .build(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java index 740b5cf4130..d7cd7b55811 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java @@ -93,13 +93,17 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface, @Override - public boolean isFormatted(String journalId) throws IOException { + public boolean isFormatted(String journalId, + String nameServiceId) throws IOException { try { - IsFormattedRequestProto req = IsFormattedRequestProto.newBuilder() - .setJid(convertJournalId(journalId)) - .build(); + IsFormattedRequestProto.Builder req = IsFormattedRequestProto.newBuilder() + .setJid(convertJournalId(journalId)); + if (nameServiceId != null) { + req.setNameServiceId(nameServiceId); + } + IsFormattedResponseProto resp = rpcProxy.isFormatted( - NULL_CONTROLLER, req); + NULL_CONTROLLER, req.build()); return resp.getIsFormatted(); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); @@ -107,13 +111,17 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface, } @Override - public GetJournalStateResponseProto getJournalState(String jid) + public GetJournalStateResponseProto getJournalState(String jid, + String nameServiceId) throws IOException { try { - GetJournalStateRequestProto req = GetJournalStateRequestProto.newBuilder() - .setJid(convertJournalId(jid)) - .build(); - return rpcProxy.getJournalState(NULL_CONTROLLER, req); + GetJournalStateRequestProto.Builder req = GetJournalStateRequestProto + .newBuilder() + .setJid(convertJournalId(jid)); + if (nameServiceId != null) { + req.setNameServiceId(nameServiceId); + } + return rpcProxy.getJournalState(NULL_CONTROLLER, req.build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -126,28 +134,39 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface, } @Override - public void format(String jid, NamespaceInfo nsInfo) throws IOException { + public void format(String jid, + String nameServiceId, + NamespaceInfo nsInfo) throws IOException { try { - FormatRequestProto req = FormatRequestProto.newBuilder() + FormatRequestProto.Builder req = FormatRequestProto.newBuilder() .setJid(convertJournalId(jid)) - .setNsInfo(PBHelper.convert(nsInfo)) - .build(); - rpcProxy.format(NULL_CONTROLLER, req); + .setNsInfo(PBHelper.convert(nsInfo)); + if(nameServiceId != null) { + req.setNameServiceId(nameServiceId); + } + + rpcProxy.format(NULL_CONTROLLER, req.build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } } @Override - public NewEpochResponseProto newEpoch(String jid, NamespaceInfo nsInfo, - long epoch) throws IOException { + public NewEpochResponseProto newEpoch(String jid, + String nameServiceId, + NamespaceInfo nsInfo, + long epoch) throws IOException { try { - NewEpochRequestProto req = NewEpochRequestProto.newBuilder() - .setJid(convertJournalId(jid)) - .setNsInfo(PBHelper.convert(nsInfo)) - .setEpoch(epoch) - .build(); - return rpcProxy.newEpoch(NULL_CONTROLLER, req); + NewEpochRequestProto.Builder req = NewEpochRequestProto.newBuilder() + .setJid(convertJournalId(jid)) + .setNsInfo(PBHelper.convert(nsInfo)) + .setEpoch(epoch); + + if(nameServiceId != null) { + req.setNameServiceId(nameServiceId); + } + + return rpcProxy.newEpoch(NULL_CONTROLLER, req.build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -191,6 +210,9 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface, if (reqInfo.hasCommittedTxId()) { builder.setCommittedTxId(reqInfo.getCommittedTxId()); } + if(reqInfo.getNameServiceId() != null) { + builder.setNameServiceId(reqInfo.getNameServiceId()); + } return builder.build(); } @@ -239,16 +261,21 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface, } @Override - public GetEditLogManifestResponseProto getEditLogManifest(String jid, - long sinceTxId, boolean inProgressOk) - throws IOException { + public GetEditLogManifestResponseProto getEditLogManifest( + String jid, String nameServiceId, + long sinceTxId, boolean inProgressOk) throws IOException { try { + GetEditLogManifestRequestProto.Builder req; + req = GetEditLogManifestRequestProto.newBuilder() + .setJid(convertJournalId(jid)) + .setSinceTxId(sinceTxId) + .setInProgressOk(inProgressOk); + if (nameServiceId !=null) { + req.setNameServiceId(nameServiceId); + } return rpcProxy.getEditLogManifest(NULL_CONTROLLER, - GetEditLogManifestRequestProto.newBuilder() - .setJid(convertJournalId(jid)) - .setSinceTxId(sinceTxId) - .setInProgressOk(inProgressOk) - .build()); + req.build() + ); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -292,10 +319,10 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface, @Override public void doPreUpgrade(String jid) throws IOException { try { - rpcProxy.doPreUpgrade(NULL_CONTROLLER, - DoPreUpgradeRequestProto.newBuilder() - .setJid(convertJournalId(jid)) - .build()); + DoPreUpgradeRequestProto.Builder req; + req = DoPreUpgradeRequestProto.newBuilder() + .setJid(convertJournalId(jid)); + rpcProxy.doPreUpgrade(NULL_CONTROLLER, req.build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } @@ -315,29 +342,37 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface, } @Override - public void doFinalize(String jid) throws IOException { + public void doFinalize(String jid, String nameServiceId) throws IOException { try { - rpcProxy.doFinalize(NULL_CONTROLLER, - DoFinalizeRequestProto.newBuilder() - .setJid(convertJournalId(jid)) - .build()); + DoFinalizeRequestProto.Builder req = DoFinalizeRequestProto + .newBuilder() + .setJid(convertJournalId(jid)); + if (nameServiceId != null) { + req.setNameServiceId(nameServiceId); + } + rpcProxy.doFinalize(NULL_CONTROLLER, req.build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } } @Override - public Boolean canRollBack(String journalId, StorageInfo storage, - StorageInfo prevStorage, int targetLayoutVersion) throws IOException { + public Boolean canRollBack(String journalId, + String nameServiceId, + StorageInfo storage, + StorageInfo prevStorage, + int targetLayoutVersion) throws IOException { try { + CanRollBackRequestProto.Builder req = CanRollBackRequestProto.newBuilder() + .setJid(convertJournalId(journalId)) + .setStorage(PBHelper.convert(storage)) + .setPrevStorage(PBHelper.convert(prevStorage)) + .setTargetLayoutVersion(targetLayoutVersion); + if (nameServiceId != null) { + req.setNameServiceId(nameServiceId); + } CanRollBackResponseProto response = rpcProxy.canRollBack( - NULL_CONTROLLER, - CanRollBackRequestProto.newBuilder() - .setJid(convertJournalId(journalId)) - .setStorage(PBHelper.convert(storage)) - .setPrevStorage(PBHelper.convert(prevStorage)) - .setTargetLayoutVersion(targetLayoutVersion) - .build()); + NULL_CONTROLLER, req.build()); return response.getCanRollBack(); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); @@ -345,38 +380,53 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface, } @Override - public void doRollback(String journalId) throws IOException { + public void doRollback(String journalId, + String nameServiceId) throws IOException { try { - rpcProxy.doRollback(NULL_CONTROLLER, - DoRollbackRequestProto.newBuilder() - .setJid(convertJournalId(journalId)) - .build()); + DoRollbackRequestProto.Builder req = DoRollbackRequestProto.newBuilder() + .setJid(convertJournalId(journalId)); + + if (nameServiceId != null) { + req.setNameserviceId(nameServiceId); + } + rpcProxy.doRollback(NULL_CONTROLLER, req.build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } } @Override - public void discardSegments(String journalId, long startTxId) + public void discardSegments(String journalId, + String nameServiceId, + long startTxId) throws IOException { try { - rpcProxy.discardSegments(NULL_CONTROLLER, - DiscardSegmentsRequestProto.newBuilder() - .setJid(convertJournalId(journalId)).setStartTxId(startTxId) - .build()); + DiscardSegmentsRequestProto.Builder req = DiscardSegmentsRequestProto + .newBuilder() + .setJid(convertJournalId(journalId)).setStartTxId(startTxId); + + if (nameServiceId != null) { + req.setNameServiceId(nameServiceId); + } + rpcProxy.discardSegments(NULL_CONTROLLER, req.build()); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); } } @Override - public Long getJournalCTime(String journalId) throws IOException { + public Long getJournalCTime(String journalId, + String nameServiceId) throws IOException { try { + + GetJournalCTimeRequestProto.Builder req = GetJournalCTimeRequestProto + .newBuilder() + .setJid(convertJournalId(journalId)); + if(nameServiceId !=null) { + req.setNameServiceId(nameServiceId); + } GetJournalCTimeResponseProto response = rpcProxy.getJournalCTime( - NULL_CONTROLLER, - GetJournalCTimeRequestProto.newBuilder() - .setJid(convertJournalId(journalId)) - .build()); + NULL_CONTROLLER, req.build()); return response.getResultCTime(); } catch (ServiceException e) { throw ProtobufHelper.getRemoteException(e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index 0f4091dcb23..408ce76d115 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -137,6 +137,11 @@ public class Journal implements Closeable { private long lastJournalTimestamp = 0; + // This variable tracks, have we tried to start journalsyncer + // with nameServiceId. This will help not to start the journalsyncer + // on each rpc call, if it has failed to start + private boolean triedJournalSyncerStartedwithnsId = false; + /** * Time threshold for sync calls, beyond which a warning should be logged to the console. */ @@ -160,6 +165,14 @@ public class Journal implements Closeable { } } + public void setTriedJournalSyncerStartedwithnsId(boolean started) { + this.triedJournalSyncerStartedwithnsId = started; + } + + public boolean getTriedJournalSyncerStartedwithnsId() { + return triedJournalSyncerStartedwithnsId; + } + /** * Reload any data that may have been cached. This is necessary * when we first load the Journal, but also after any formatting @@ -660,7 +673,7 @@ public class Journal implements Closeable { } /** - * @see QJournalProtocol#getEditLogManifest(String, long, boolean) + * @see QJournalProtocol#getEditLogManifest(String, String, long, boolean) */ public RemoteEditLogManifest getEditLogManifest(long sinceTxId, boolean inProgressOk) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java index f56848cfdee..0954eaf6c17 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNode.java @@ -86,7 +86,9 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean { */ private int resultCode = 0; - synchronized Journal getOrCreateJournal(String jid, StartupOption startOpt) + synchronized Journal getOrCreateJournal(String jid, + String nameServiceId, + StartupOption startOpt) throws IOException { QuorumJournalManager.checkJournalId(jid); @@ -101,22 +103,46 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean { if (conf.getBoolean( DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_DEFAULT)) { - startSyncer(journal, jid); + startSyncer(journal, jid, nameServiceId); } + } else if (journalSyncersById.get(jid) != null && + !journalSyncersById.get(jid).isJournalSyncerStarted() && + !journalsById.get(jid).getTriedJournalSyncerStartedwithnsId() && + nameServiceId != null) { + startSyncer(journal, jid, nameServiceId); } + return journal; } - private void startSyncer(Journal journal, String jid) { - JournalNodeSyncer jSyncer = new JournalNodeSyncer(this, journal, jid, conf); - journalSyncersById.put(jid, jSyncer); - jSyncer.start(); + @VisibleForTesting + public boolean getJournalSyncerStatus(String jid) { + if (journalSyncersById.get(jid) != null) { + return journalSyncersById.get(jid).isJournalSyncerStarted(); + } else { + return false; + } + } + + private void startSyncer(Journal journal, String jid, String nameServiceId) { + JournalNodeSyncer jSyncer = journalSyncersById.get(jid); + if (jSyncer == null) { + jSyncer = new JournalNodeSyncer(this, journal, jid, conf, nameServiceId); + journalSyncersById.put(jid, jSyncer); + } + jSyncer.start(nameServiceId); } @VisibleForTesting public Journal getOrCreateJournal(String jid) throws IOException { - return getOrCreateJournal(jid, StartupOption.REGULAR); + return getOrCreateJournal(jid, null, StartupOption.REGULAR); + } + + public Journal getOrCreateJournal(String jid, + String nameServiceId) + throws IOException { + return getOrCreateJournal(jid, nameServiceId, StartupOption.REGULAR); } @Override @@ -357,26 +383,40 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean { getOrCreateJournal(journalId).doUpgrade(sInfo); } - public void doFinalize(String journalId) throws IOException { - getOrCreateJournal(journalId).doFinalize(); + public void doFinalize(String journalId, + String nameServiceId) + throws IOException { + getOrCreateJournal(journalId, nameServiceId).doFinalize(); } public Boolean canRollBack(String journalId, StorageInfo storage, - StorageInfo prevStorage, int targetLayoutVersion) throws IOException { - return getOrCreateJournal(journalId, StartupOption.ROLLBACK).canRollBack( + StorageInfo prevStorage, int targetLayoutVersion, + String nameServiceId) throws IOException { + return getOrCreateJournal(journalId, + nameServiceId, StartupOption.ROLLBACK).canRollBack( storage, prevStorage, targetLayoutVersion); } - public void doRollback(String journalId) throws IOException { - getOrCreateJournal(journalId, StartupOption.ROLLBACK).doRollback(); + public void doRollback(String journalId, + String nameServiceId) throws IOException { + getOrCreateJournal(journalId, + nameServiceId, StartupOption.ROLLBACK).doRollback(); } - public void discardSegments(String journalId, long startTxId) + public void discardSegments(String journalId, long startTxId, + String nameServiceId) throws IOException { - getOrCreateJournal(journalId).discardSegments(startTxId); + getOrCreateJournal(journalId, nameServiceId).discardSegments(startTxId); } - public Long getJournalCTime(String journalId) throws IOException { - return getOrCreateJournal(journalId).getJournalCTime(); + public Long getJournalCTime(String journalId, + String nameServiceId) throws IOException { + return getOrCreateJournal(journalId, nameServiceId).getJournalCTime(); } + + @VisibleForTesting + public Journal getJournal(String jid) { + return journalsById.get(jid); + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java index 2a14ad4a57e..748a51c65cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java @@ -49,10 +49,10 @@ import org.apache.hadoop.net.NetUtils; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingService; + @InterfaceAudience.Private @VisibleForTesting public class JournalNodeRpcServer implements QJournalProtocol { - private static final int HANDLER_COUNT = 5; private final JournalNode jn; private Server server; @@ -117,15 +117,18 @@ public class JournalNodeRpcServer implements QJournalProtocol { } @Override - public boolean isFormatted(String journalId) throws IOException { - return jn.getOrCreateJournal(journalId).isFormatted(); + public boolean isFormatted(String journalId, + String nameServiceId) throws IOException { + return jn.getOrCreateJournal(journalId, nameServiceId).isFormatted(); } @SuppressWarnings("deprecation") @Override - public GetJournalStateResponseProto getJournalState(String journalId) + public GetJournalStateResponseProto getJournalState(String journalId, + String nameServiceId) throws IOException { - long epoch = jn.getOrCreateJournal(journalId).getLastPromisedEpoch(); + long epoch = jn.getOrCreateJournal(journalId, + nameServiceId).getLastPromisedEpoch(); return GetJournalStateResponseProto.newBuilder() .setLastPromisedEpoch(epoch) .setHttpPort(jn.getBoundHttpAddress().getPort()) @@ -135,59 +138,64 @@ public class JournalNodeRpcServer implements QJournalProtocol { @Override public NewEpochResponseProto newEpoch(String journalId, - NamespaceInfo nsInfo, + String nameServiceId, + NamespaceInfo nsInfo, long epoch) throws IOException { - return jn.getOrCreateJournal(journalId).newEpoch(nsInfo, epoch); + return jn.getOrCreateJournal(journalId, + nameServiceId).newEpoch(nsInfo, epoch); } @Override - public void format(String journalId, NamespaceInfo nsInfo) + public void format(String journalId, + String nameServiceId, + NamespaceInfo nsInfo) throws IOException { - jn.getOrCreateJournal(journalId).format(nsInfo); + jn.getOrCreateJournal(journalId, nameServiceId).format(nsInfo); } @Override public void journal(RequestInfo reqInfo, long segmentTxId, long firstTxnId, int numTxns, byte[] records) throws IOException { - jn.getOrCreateJournal(reqInfo.getJournalId()) + jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()) .journal(reqInfo, segmentTxId, firstTxnId, numTxns, records); } @Override public void heartbeat(RequestInfo reqInfo) throws IOException { - jn.getOrCreateJournal(reqInfo.getJournalId()) + jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()) .heartbeat(reqInfo); } @Override public void startLogSegment(RequestInfo reqInfo, long txid, int layoutVersion) throws IOException { - jn.getOrCreateJournal(reqInfo.getJournalId()) + jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()) .startLogSegment(reqInfo, txid, layoutVersion); } @Override public void finalizeLogSegment(RequestInfo reqInfo, long startTxId, long endTxId) throws IOException { - jn.getOrCreateJournal(reqInfo.getJournalId()) + jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()) .finalizeLogSegment(reqInfo, startTxId, endTxId); } @Override public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep) throws IOException { - jn.getOrCreateJournal(reqInfo.getJournalId()) + jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()) .purgeLogsOlderThan(reqInfo, minTxIdToKeep); } @SuppressWarnings("deprecation") @Override - public GetEditLogManifestResponseProto getEditLogManifest(String jid, + public GetEditLogManifestResponseProto getEditLogManifest( + String jid, String nameServiceId, long sinceTxId, boolean inProgressOk) throws IOException { - RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid) + RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid, nameServiceId) .getEditLogManifest(sinceTxId, inProgressOk); return GetEditLogManifestResponseProto.newBuilder() @@ -200,14 +208,15 @@ public class JournalNodeRpcServer implements QJournalProtocol { @Override public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo, long segmentTxId) throws IOException { - return jn.getOrCreateJournal(reqInfo.getJournalId()) + return jn.getOrCreateJournal(reqInfo.getJournalId(), + reqInfo.getNameServiceId()) .prepareRecovery(reqInfo, segmentTxId); } @Override public void acceptRecovery(RequestInfo reqInfo, SegmentStateProto log, URL fromUrl) throws IOException { - jn.getOrCreateJournal(reqInfo.getJournalId()) + jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId()) .acceptRecovery(reqInfo, log, fromUrl); } @@ -222,30 +231,36 @@ public class JournalNodeRpcServer implements QJournalProtocol { } @Override - public void doFinalize(String journalId) throws IOException { - jn.doFinalize(journalId); + public void doFinalize(String journalId, + String nameServiceId) throws IOException { + jn.doFinalize(journalId, nameServiceId); } @Override - public Boolean canRollBack(String journalId, StorageInfo storage, + public Boolean canRollBack(String journalId, + String nameServiceId, StorageInfo storage, StorageInfo prevStorage, int targetLayoutVersion) throws IOException { - return jn.canRollBack(journalId, storage, prevStorage, targetLayoutVersion); + return jn.canRollBack(journalId, storage, prevStorage, targetLayoutVersion, + nameServiceId); } @Override - public void doRollback(String journalId) throws IOException { - jn.doRollback(journalId); + public void doRollback(String journalId, + String nameServiceId) throws IOException { + jn.doRollback(journalId, nameServiceId); } @Override - public void discardSegments(String journalId, long startTxId) + public void discardSegments(String journalId, + String nameServiceId, long startTxId) throws IOException { - jn.discardSegments(journalId, startTxId); + jn.discardSegments(journalId, startTxId, nameServiceId); } @Override - public Long getJournalCTime(String journalId) throws IOException { - return jn.getJournalCTime(journalId); + public Long getJournalCTime(String journalId, + String nameServiceId) throws IOException { + return jn.getJournalCTime(journalId, nameServiceId); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java index 0155b854110..b843aa87fbe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeSyncer.java @@ -65,6 +65,7 @@ public class JournalNodeSyncer { private final JournalNode jn; private final Journal journal; private final String jid; + private String nameServiceId; private final JournalIdProto jidProto; private final JNStorage jnStorage; private final Configuration conf; @@ -78,12 +79,14 @@ public class JournalNodeSyncer { private final int logSegmentTransferTimeout; private final DataTransferThrottler throttler; private final JournalMetrics metrics; + private boolean journalSyncerStarted; JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid, - Configuration conf) { + Configuration conf, String nameServiceId) { this.jn = jouranlNode; this.journal = journal; this.jid = jid; + this.nameServiceId = nameServiceId; this.jidProto = convertJournalId(this.jid); this.jnStorage = journal.getStorage(); this.conf = conf; @@ -95,6 +98,7 @@ public class JournalNodeSyncer { DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT); throttler = getThrottler(conf); metrics = journal.getMetrics(); + journalSyncerStarted = false; } void stopSync() { @@ -109,13 +113,21 @@ public class JournalNodeSyncer { } } - public void start() { - LOG.info("Starting SyncJournal daemon for journal " + jid); - if (getOtherJournalNodeProxies()) { - startSyncJournalsDaemon(); - } else { - LOG.warn("Failed to start SyncJournal daemon for journal " + jid); + public void start(String nsId) { + if (nsId != null) { + this.nameServiceId = nsId; + journal.setTriedJournalSyncerStartedwithnsId(true); } + if (!journalSyncerStarted && getOtherJournalNodeProxies()) { + LOG.info("Starting SyncJournal daemon for journal " + jid); + startSyncJournalsDaemon(); + journalSyncerStarted = true; + } + + } + + public boolean isJournalSyncerStarted() { + return journalSyncerStarted; } private boolean createEditsSyncDir() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java index a8f5bfaa32e..af2a5af94d9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSEditLog.java @@ -1805,8 +1805,20 @@ public class FSEditLog implements LogsPurgeable { try { Constructor cons = clazz.getConstructor(Configuration.class, URI.class, + NamespaceInfo.class, String.class); + String nameServiceId = conf.get(DFSConfigKeys.DFS_NAMESERVICE_ID); + return cons.newInstance(conf, uri, storage.getNamespaceInfo(), + nameServiceId); + } catch (NoSuchMethodException ne) { + try { + Constructor cons + = clazz.getConstructor(Configuration.class, URI.class, NamespaceInfo.class); - return cons.newInstance(conf, uri, storage.getNamespaceInfo()); + return cons.newInstance(conf, uri, storage.getNamespaceInfo()); + } catch (Exception e) { + throw new IllegalArgumentException("Unable to construct journal, " + + uri, e); + } } catch (Exception e) { throw new IllegalArgumentException("Unable to construct journal, " + uri, e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto index 960a21f5b6d..a37c7236a65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto @@ -46,6 +46,7 @@ message RequestInfoProto { // request itself, eg in the case that the node has // fallen behind. optional uint64 committedTxId = 4; + optional string nameServiceId = 5; } message SegmentStateProto { @@ -73,6 +74,7 @@ message JournalRequestProto { required uint32 numTxns = 3; required bytes records = 4; required uint64 segmentTxnId = 5; + optional string nameServiceId = 6; } message JournalResponseProto { @@ -129,6 +131,7 @@ message PurgeLogsResponseProto { */ message IsFormattedRequestProto { required JournalIdProto jid = 1; + optional string nameServiceId = 2; } message IsFormattedResponseProto { @@ -140,6 +143,7 @@ message IsFormattedResponseProto { */ message GetJournalCTimeRequestProto { required JournalIdProto jid = 1; + optional string nameServiceId = 2; } message GetJournalCTimeResponseProto { @@ -172,6 +176,7 @@ message DoUpgradeResponseProto { */ message DoFinalizeRequestProto { required JournalIdProto jid = 1; + optional string nameServiceId = 2; } message DoFinalizeResponseProto { @@ -185,6 +190,7 @@ message CanRollBackRequestProto { required StorageInfoProto storage = 2; required StorageInfoProto prevStorage = 3; required int32 targetLayoutVersion = 4; + optional string nameServiceId = 5; } message CanRollBackResponseProto { @@ -196,6 +202,7 @@ message CanRollBackResponseProto { */ message DoRollbackRequestProto { required JournalIdProto jid = 1; + optional string nameserviceId = 2; } message DoRollbackResponseProto { @@ -207,6 +214,7 @@ message DoRollbackResponseProto { message DiscardSegmentsRequestProto { required JournalIdProto jid = 1; required uint64 startTxId = 2; + optional string nameServiceId = 3; } message DiscardSegmentsResponseProto { @@ -217,6 +225,7 @@ message DiscardSegmentsResponseProto { */ message GetJournalStateRequestProto { required JournalIdProto jid = 1; + optional string nameServiceId = 2; } message GetJournalStateResponseProto { @@ -232,6 +241,7 @@ message GetJournalStateResponseProto { message FormatRequestProto { required JournalIdProto jid = 1; required NamespaceInfoProto nsInfo = 2; + optional string nameServiceId = 3; } message FormatResponseProto { @@ -244,6 +254,7 @@ message NewEpochRequestProto { required JournalIdProto jid = 1; required NamespaceInfoProto nsInfo = 2; required uint64 epoch = 3; + optional string nameServiceId = 4; } message NewEpochResponseProto { @@ -259,6 +270,7 @@ message GetEditLogManifestRequestProto { // Whether or not the client will be reading from the returned streams. // optional bool forReading = 3 [default = true]; optional bool inProgressOk = 4 [default = false]; + optional string nameServiceId = 5; } message GetEditLogManifestResponseProto { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java index d57e0891a85..5101a41f0e5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java @@ -107,9 +107,9 @@ public class TestEpochsAreUnique { private class FaultyLoggerFactory implements AsyncLogger.Factory { @Override public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, - String journalId, InetSocketAddress addr) { + String journalId, String nameServiceId, InetSocketAddress addr) { AsyncLogger ch = IPCLoggerChannel.FACTORY.createLogger( - conf, nsInfo, journalId, addr); + conf, nsInfo, journalId, nameServiceId, addr); AsyncLogger spy = Mockito.spy(ch); Mockito.doAnswer(new SometimesFaulty(0.10f)) .when(spy).getJournalState(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java index 9ada40f6fda..6ad43f5835e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java @@ -61,7 +61,6 @@ import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; -import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.google.common.collect.Sets; @@ -440,9 +439,14 @@ public class TestQJMWithFaults { new WrapEveryCall(realProxy) { void beforeCall(InvocationOnMock invocation) throws Exception { rpcCount++; + + String param=""; + for (Object val : invocation.getArguments()) { + param += val +","; + } String callStr = "[" + addr + "] " + invocation.getMethod().getName() + "(" + - Joiner.on(", ").join(invocation.getArguments()) + ")"; + param + ")"; Callable inject = injections.get(rpcCount); if (inject != null) { @@ -505,7 +509,7 @@ public class TestQJMWithFaults { AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() { @Override public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, - String journalId, InetSocketAddress addr) { + String journalId, String nameserviceId, InetSocketAddress addr) { return new InvocationCountingChannel(conf, nsInfo, journalId, addr); } }; @@ -520,7 +524,7 @@ public class TestQJMWithFaults { AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() { @Override public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, - String journalId, InetSocketAddress addr) { + String journalId, String nameServiceId, InetSocketAddress addr) { return new RandomFaultyChannel(conf, nsInfo, journalId, addr, seedGenerator.nextLong()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index 8d92666630a..ce1d4042103 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -940,8 +940,9 @@ public class TestQuorumJournalManager { AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() { @Override public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, - String journalId, InetSocketAddress addr) { - AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) { + String journalId, String nameServiceId, InetSocketAddress addr) { + AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, + nameServiceId, addr) { protected ExecutorService createSingleThreadExecutor() { // Don't parallelize calls to the quorum in the tests. // This makes the tests more deterministic. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java index 4c36bcb77b6..b71d69445c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java @@ -156,12 +156,12 @@ public class TestJournal { journal.startLogSegment(makeRI(1), 1, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); // Send txids 1-3, with a request indicating only 0 committed - journal.journal(new RequestInfo(JID, 1, 2, 0), 1, 1, 3, + journal.journal(new RequestInfo(JID, null, 1, 2, 0), 1, 1, 3, QJMTestUtil.createTxnData(1, 3)); assertEquals(0, journal.getCommittedTxnId()); // Send 4-6, with request indicating that through 3 is committed. - journal.journal(new RequestInfo(JID, 1, 3, 3), 1, 4, 3, + journal.journal(new RequestInfo(JID, null, 1, 3, 3), 1, 4, 3, QJMTestUtil.createTxnData(4, 6)); assertEquals(3, journal.getCommittedTxnId()); } @@ -195,7 +195,7 @@ public class TestJournal { @Test (timeout = 10000) public void testFormatResetsCachedValues() throws Exception { journal.newEpoch(FAKE_NSINFO, 12345L); - journal.startLogSegment(new RequestInfo(JID, 12345L, 1L, 0L), 1L, + journal.startLogSegment(new RequestInfo(JID, null, 12345L, 1L, 0L), 1L, NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); assertEquals(12345L, journal.getLastPromisedEpoch()); @@ -404,7 +404,7 @@ public class TestJournal { } private static RequestInfo makeRI(int serial) { - return new RequestInfo(JID, 1, serial, 0); + return new RequestInfo(JID, null, 1, serial, 0); } @Test (timeout = 10000) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java index 77b50a178eb..5bf69f0c8a6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNode.java @@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.File; +import java.io.IOException; import java.net.HttpURLConnection; import java.net.URL; import java.util.concurrent.ExecutionException; @@ -43,24 +44,30 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.MetricsAsserts; import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.StopWatch; import org.junit.After; +import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import com.google.common.base.Charsets; import com.google.common.primitives.Bytes; import com.google.common.primitives.Ints; +import org.junit.rules.TestName; import org.mockito.Mockito; public class TestJournalNode { private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo( 12345, "mycluster", "my-bp", 0L); + @Rule + public TestName testName = new TestName(); private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestJournalNode.class); @@ -85,6 +92,21 @@ public class TestJournalNode { editsDir.getAbsolutePath()); conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, "0.0.0.0:0"); + if (testName.getMethodName().equals( + "testJournalNodeSyncerNotStartWhenSyncDisabled")) { + conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, + false); + conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, + "qjournal://jn0:9900;jn1:9901"); + } else if (testName.getMethodName().equals( + "testJournalNodeSyncerNotStartWhenSyncEnabledIncorrectURI")) { + conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, + "qjournal://journal0\\:9900;journal1:9901"); + } else if (testName.getMethodName().equals( + "testJournalNodeSyncerNotStartWhenSyncEnabled")) { + conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY, + "qjournal://jn0:9900;jn1:9901"); + } jn = new JournalNode(); jn.setConf(conf); jn.start(); @@ -363,4 +385,79 @@ public class TestJournalNode { Mockito.verify(jNode).stop(1); } + @Test + public void testJournalNodeSyncerNotStartWhenSyncDisabled() + throws IOException{ + //JournalSyncer will not be started, as journalsync is not enabled + conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, false); + jn.getOrCreateJournal(journalId); + Assert.assertEquals(false, + jn.getJournalSyncerStatus(journalId)); + Assert.assertEquals(false, + jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId()); + + //Trying by passing nameserviceId still journalnodesyncer should not start + // IstriedJournalSyncerStartWithnsId should also be false + jn.getOrCreateJournal(journalId, "mycluster"); + Assert.assertEquals(false, + jn.getJournalSyncerStatus(journalId)); + Assert.assertEquals(false, + jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId()); + + } + + @Test + public void testJournalNodeSyncerNotStartWhenSyncEnabledIncorrectURI() + throws IOException{ + //JournalSyncer will not be started, + // as shared edits hostnames are not resolved + jn.getOrCreateJournal(journalId); + Assert.assertEquals(false, + jn.getJournalSyncerStatus(journalId)); + Assert.assertEquals(false, + jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId()); + + //Trying by passing nameserviceId, now + // IstriedJournalSyncerStartWithnsId should be set + // but journalnode syncer will not be started, + // as hostnames are not resolved + jn.getOrCreateJournal(journalId, "mycluster"); + Assert.assertEquals(false, + jn.getJournalSyncerStatus(journalId)); + Assert.assertEquals(true, + jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId()); + + } + + @Test + public void testJournalNodeSyncerNotStartWhenSyncEnabled() + throws IOException{ + //JournalSyncer will not be started, + // as shared edits hostnames are not resolved + jn.getOrCreateJournal(journalId); + Assert.assertEquals(false, + jn.getJournalSyncerStatus(journalId)); + Assert.assertEquals(false, + jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId()); + + //Trying by passing nameserviceId and resolve hostnames + // now IstriedJournalSyncerStartWithnsId should be set + // and also journalnode syncer will also be started + setupStaticHostResolution(2, "jn"); + jn.getOrCreateJournal(journalId, "mycluster"); + Assert.assertEquals(true, + jn.getJournalSyncerStatus(journalId)); + Assert.assertEquals(true, + jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId()); + + } + + private void setupStaticHostResolution(int nameServiceIdCount, + String hostname) { + for (int i = 0; i < nameServiceIdCount; i++) { + NetUtils.addStaticResolution(hostname + i, + "localhost"); + } + } + } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java index 09ef3a57560..8de96417536 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournalNodeSync.java @@ -96,6 +96,13 @@ public class TestJournalNodeSync { @Test(timeout=30000) public void testJournalNodeSync() throws Exception { + + //As by default 3 journal nodes are started; + for(int i=0; i<3; i++) { + Assert.assertEquals(true, + jCluster.getJournalNode(i).getJournalSyncerStatus("ns1")); + } + File firstJournalDir = jCluster.getJournalDir(0, jid); File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) .getCurrentDir();