HDFS-12553. Add nameServiceId to QJournalProtocol. Contributed by Bharat Viswanadham

This commit is contained in:
Arpit Agarwal 2017-10-13 14:22:21 -07:00
parent e163f41850
commit 8dd1eeb94f
19 changed files with 505 additions and 178 deletions

View File

@ -49,7 +49,7 @@ interface AsyncLogger {
interface Factory { interface Factory {
AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr); String journalId, String nameServiceId, InetSocketAddress addr);
} }
/** /**

View File

@ -95,6 +95,8 @@ public class IPCLoggerChannel implements AsyncLogger {
private long committedTxId = HdfsServerConstants.INVALID_TXID; private long committedTxId = HdfsServerConstants.INVALID_TXID;
private final String journalId; private final String journalId;
private final String nameServiceId;
private final NamespaceInfo nsInfo; private final NamespaceInfo nsInfo;
private URL httpServerURL; private URL httpServerURL;
@ -152,8 +154,8 @@ public class IPCLoggerChannel implements AsyncLogger {
static final Factory FACTORY = new AsyncLogger.Factory() { static final Factory FACTORY = new AsyncLogger.Factory() {
@Override @Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) { String journalId, String nameServiceId, InetSocketAddress addr) {
return new IPCLoggerChannel(conf, nsInfo, journalId, addr); return new IPCLoggerChannel(conf, nsInfo, journalId, nameServiceId, addr);
} }
}; };
@ -161,11 +163,19 @@ public class IPCLoggerChannel implements AsyncLogger {
NamespaceInfo nsInfo, NamespaceInfo nsInfo,
String journalId, String journalId,
InetSocketAddress addr) { InetSocketAddress addr) {
this(conf, nsInfo, journalId, null, addr);
}
public IPCLoggerChannel(Configuration conf,
NamespaceInfo nsInfo,
String journalId,
String nameServiceId,
InetSocketAddress addr) {
this.conf = conf; this.conf = conf;
this.nsInfo = nsInfo; this.nsInfo = nsInfo;
this.journalId = journalId; this.journalId = journalId;
this.nameServiceId = nameServiceId;
this.addr = addr; this.addr = addr;
this.queueSizeLimitBytes = 1024 * 1024 * conf.getInt( this.queueSizeLimitBytes = 1024 * 1024 * conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY, DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY,
DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT); DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT);
@ -286,7 +296,8 @@ public class IPCLoggerChannel implements AsyncLogger {
private synchronized RequestInfo createReqInfo() { private synchronized RequestInfo createReqInfo() {
Preconditions.checkState(epoch > 0, "bad epoch: " + epoch); Preconditions.checkState(epoch > 0, "bad epoch: " + epoch);
return new RequestInfo(journalId, epoch, ipcSerial++, committedTxId); return new RequestInfo(journalId, nameServiceId,
epoch, ipcSerial++, committedTxId);
} }
@VisibleForTesting @VisibleForTesting
@ -330,7 +341,7 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<Boolean>() { return singleThreadExecutor.submit(new Callable<Boolean>() {
@Override @Override
public Boolean call() throws IOException { public Boolean call() throws IOException {
return getProxy().isFormatted(journalId); return getProxy().isFormatted(journalId, nameServiceId);
} }
}); });
} }
@ -341,7 +352,7 @@ public class IPCLoggerChannel implements AsyncLogger {
@Override @Override
public GetJournalStateResponseProto call() throws IOException { public GetJournalStateResponseProto call() throws IOException {
GetJournalStateResponseProto ret = GetJournalStateResponseProto ret =
getProxy().getJournalState(journalId); getProxy().getJournalState(journalId, nameServiceId);
constructHttpServerURI(ret); constructHttpServerURI(ret);
return ret; return ret;
} }
@ -354,7 +365,7 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<NewEpochResponseProto>() { return singleThreadExecutor.submit(new Callable<NewEpochResponseProto>() {
@Override @Override
public NewEpochResponseProto call() throws IOException { public NewEpochResponseProto call() throws IOException {
return getProxy().newEpoch(journalId, nsInfo, epoch); return getProxy().newEpoch(journalId, nameServiceId, nsInfo, epoch);
} }
}); });
} }
@ -495,7 +506,7 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<Void>() { return singleThreadExecutor.submit(new Callable<Void>() {
@Override @Override
public Void call() throws Exception { public Void call() throws Exception {
getProxy().format(journalId, nsInfo); getProxy().format(journalId, nameServiceId, nsInfo);
return null; return null;
} }
}); });
@ -554,7 +565,7 @@ public class IPCLoggerChannel implements AsyncLogger {
@Override @Override
public RemoteEditLogManifest call() throws IOException { public RemoteEditLogManifest call() throws IOException {
GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest( GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
journalId, fromTxnId, inProgressOk); journalId, nameServiceId, fromTxnId, inProgressOk);
// Update the http port, since we need this to build URLs to any of the // Update the http port, since we need this to build URLs to any of the
// returned logs. // returned logs.
constructHttpServerURI(ret); constructHttpServerURI(ret);
@ -573,7 +584,7 @@ public class IPCLoggerChannel implements AsyncLogger {
// force an RPC call so we know what the HTTP port should be if it // force an RPC call so we know what the HTTP port should be if it
// haven't done so. // haven't done so.
GetJournalStateResponseProto ret = getProxy().getJournalState( GetJournalStateResponseProto ret = getProxy().getJournalState(
journalId); journalId, nameServiceId);
constructHttpServerURI(ret); constructHttpServerURI(ret);
} }
return getProxy().prepareRecovery(createReqInfo(), segmentTxId); return getProxy().prepareRecovery(createReqInfo(), segmentTxId);
@ -620,7 +631,7 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<Void>() { return singleThreadExecutor.submit(new Callable<Void>() {
@Override @Override
public Void call() throws IOException { public Void call() throws IOException {
getProxy().doFinalize(journalId); getProxy().doFinalize(journalId, nameServiceId);
return null; return null;
} }
}); });
@ -632,8 +643,8 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<Boolean>() { return singleThreadExecutor.submit(new Callable<Boolean>() {
@Override @Override
public Boolean call() throws IOException { public Boolean call() throws IOException {
return getProxy().canRollBack(journalId, storage, prevStorage, return getProxy().canRollBack(journalId, nameServiceId,
targetLayoutVersion); storage, prevStorage, targetLayoutVersion);
} }
}); });
} }
@ -643,7 +654,7 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<Void>() { return singleThreadExecutor.submit(new Callable<Void>() {
@Override @Override
public Void call() throws IOException { public Void call() throws IOException {
getProxy().doRollback(journalId); getProxy().doRollback(journalId, nameServiceId);
return null; return null;
} }
}); });
@ -654,7 +665,7 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<Void>() { return singleThreadExecutor.submit(new Callable<Void>() {
@Override @Override
public Void call() throws IOException { public Void call() throws IOException {
getProxy().discardSegments(journalId, startTxId); getProxy().discardSegments(journalId, nameServiceId, startTxId);
return null; return null;
} }
}); });
@ -665,7 +676,7 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<Long>() { return singleThreadExecutor.submit(new Callable<Long>() {
@Override @Override
public Long call() throws IOException { public Long call() throws IOException {
return getProxy().getJournalCTime(journalId); return getProxy().getJournalCTime(journalId, nameServiceId);
} }
}); });
} }

View File

@ -85,6 +85,7 @@ public class QuorumJournalManager implements JournalManager {
private final Configuration conf; private final Configuration conf;
private final URI uri; private final URI uri;
private final NamespaceInfo nsInfo; private final NamespaceInfo nsInfo;
private final String nameServiceId;
private boolean isActiveWriter; private boolean isActiveWriter;
private final AsyncLoggerSet loggers; private final AsyncLoggerSet loggers;
@ -92,19 +93,36 @@ public class QuorumJournalManager implements JournalManager {
private int outputBufferCapacity = 512 * 1024; private int outputBufferCapacity = 512 * 1024;
private final URLConnectionFactory connectionFactory; private final URLConnectionFactory connectionFactory;
@VisibleForTesting
public QuorumJournalManager(Configuration conf, public QuorumJournalManager(Configuration conf,
URI uri, NamespaceInfo nsInfo) throws IOException { URI uri,
this(conf, uri, nsInfo, IPCLoggerChannel.FACTORY); NamespaceInfo nsInfo) throws IOException {
this(conf, uri, nsInfo, null, IPCLoggerChannel.FACTORY);
} }
public QuorumJournalManager(Configuration conf,
URI uri, NamespaceInfo nsInfo, String nameServiceId) throws IOException {
this(conf, uri, nsInfo, nameServiceId, IPCLoggerChannel.FACTORY);
}
@VisibleForTesting
QuorumJournalManager(Configuration conf, QuorumJournalManager(Configuration conf,
URI uri, NamespaceInfo nsInfo, URI uri, NamespaceInfo nsInfo,
AsyncLogger.Factory loggerFactory) throws IOException { AsyncLogger.Factory loggerFactory) throws IOException {
this(conf, uri, nsInfo, null, loggerFactory);
}
QuorumJournalManager(Configuration conf,
URI uri, NamespaceInfo nsInfo, String nameServiceId,
AsyncLogger.Factory loggerFactory) throws IOException {
Preconditions.checkArgument(conf != null, "must be configured"); Preconditions.checkArgument(conf != null, "must be configured");
this.conf = conf; this.conf = conf;
this.uri = uri; this.uri = uri;
this.nsInfo = nsInfo; this.nsInfo = nsInfo;
this.nameServiceId = nameServiceId;
this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory)); this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
this.connectionFactory = URLConnectionFactory this.connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf); .newDefaultURLConnectionFactory(conf);
@ -142,7 +160,7 @@ public class QuorumJournalManager implements JournalManager {
protected List<AsyncLogger> createLoggers( protected List<AsyncLogger> createLoggers(
AsyncLogger.Factory factory) throws IOException { AsyncLogger.Factory factory) throws IOException {
return createLoggers(conf, uri, nsInfo, factory); return createLoggers(conf, uri, nsInfo, factory, nameServiceId);
} }
static String parseJournalId(URI uri) { static String parseJournalId(URI uri) {
@ -354,7 +372,10 @@ public class QuorumJournalManager implements JournalManager {
} }
static List<AsyncLogger> createLoggers(Configuration conf, static List<AsyncLogger> createLoggers(Configuration conf,
URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory) URI uri,
NamespaceInfo nsInfo,
AsyncLogger.Factory factory,
String nameServiceId)
throws IOException { throws IOException {
List<AsyncLogger> ret = Lists.newArrayList(); List<AsyncLogger> ret = Lists.newArrayList();
List<InetSocketAddress> addrs = Util.getAddressesList(uri); List<InetSocketAddress> addrs = Util.getAddressesList(uri);
@ -364,7 +385,7 @@ public class QuorumJournalManager implements JournalManager {
} }
String jid = parseJournalId(uri); String jid = parseJournalId(uri);
for (InetSocketAddress addr : addrs) { for (InetSocketAddress addr : addrs) {
ret.add(factory.createLogger(conf, nsInfo, jid, addr)); ret.add(factory.createLogger(conf, nsInfo, jid, nameServiceId, addr));
} }
return ret; return ret;
} }

View File

@ -53,26 +53,30 @@ public interface QJournalProtocol {
* @return true if the given journal has been formatted and * @return true if the given journal has been formatted and
* contains valid data. * contains valid data.
*/ */
public boolean isFormatted(String journalId) throws IOException; boolean isFormatted(String journalId,
String nameServiceId) throws IOException;
/** /**
* Get the current state of the journal, including the most recent * Get the current state of the journal, including the most recent
* epoch number and the HTTP port. * epoch number and the HTTP port.
*/ */
public GetJournalStateResponseProto getJournalState(String journalId) GetJournalStateResponseProto getJournalState(String journalId,
String nameServiceId)
throws IOException; throws IOException;
/** /**
* Format the underlying storage for the given namespace. * Format the underlying storage for the given namespace.
*/ */
public void format(String journalId, void format(String journalId, String nameServiceId,
NamespaceInfo nsInfo) throws IOException; NamespaceInfo nsInfo) throws IOException;
/** /**
* Begin a new epoch. See the HDFS-3077 design doc for details. * Begin a new epoch. See the HDFS-3077 design doc for details.
*/ */
public NewEpochResponseProto newEpoch(String journalId, NewEpochResponseProto newEpoch(String journalId,
NamespaceInfo nsInfo, long epoch) throws IOException; String nameServiceId,
NamespaceInfo nsInfo,
long epoch) throws IOException;
/** /**
* Journal edit records. * Journal edit records.
@ -130,8 +134,10 @@ public interface QJournalProtocol {
* segment * segment
* @return a list of edit log segments since the given transaction ID. * @return a list of edit log segments since the given transaction ID.
*/ */
public GetEditLogManifestResponseProto getEditLogManifest(String jid, GetEditLogManifestResponseProto getEditLogManifest(String jid,
long sinceTxId, boolean inProgressOk) String nameServiceId,
long sinceTxId,
boolean inProgressOk)
throws IOException; throws IOException;
/** /**
@ -147,24 +153,30 @@ public interface QJournalProtocol {
public void acceptRecovery(RequestInfo reqInfo, public void acceptRecovery(RequestInfo reqInfo,
SegmentStateProto stateToAccept, URL fromUrl) throws IOException; SegmentStateProto stateToAccept, URL fromUrl) throws IOException;
public void doPreUpgrade(String journalId) throws IOException; void doPreUpgrade(String journalId) throws IOException;
public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException; public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException;
public void doFinalize(String journalId) throws IOException; void doFinalize(String journalId,
String nameServiceid) throws IOException;
public Boolean canRollBack(String journalId, StorageInfo storage, Boolean canRollBack(String journalId, String nameServiceid,
StorageInfo prevStorage, int targetLayoutVersion) throws IOException; StorageInfo storage, StorageInfo prevStorage,
int targetLayoutVersion) throws IOException;
public void doRollback(String journalId) throws IOException; void doRollback(String journalId,
String nameServiceid) throws IOException;
/** /**
* Discard journal segments whose first TxId is greater than or equal to the * Discard journal segments whose first TxId is greater than or equal to the
* given txid. * given txid.
*/ */
@Idempotent @Idempotent
public void discardSegments(String journalId, long startTxId) void discardSegments(String journalId,
String nameServiceId,
long startTxId)
throws IOException; throws IOException;
public Long getJournalCTime(String journalId) throws IOException; Long getJournalCTime(String journalId,
String nameServiceId) throws IOException;
} }

View File

@ -26,15 +26,22 @@ public class RequestInfo {
private long epoch; private long epoch;
private long ipcSerialNumber; private long ipcSerialNumber;
private final long committedTxId; private final long committedTxId;
private final String nameServiceId;
public RequestInfo(String jid, long epoch, long ipcSerialNumber, public RequestInfo(String jid, String nameServiceId,
long epoch, long ipcSerialNumber,
long committedTxId) { long committedTxId) {
this.jid = jid; this.jid = jid;
this.nameServiceId = nameServiceId;
this.epoch = epoch; this.epoch = epoch;
this.ipcSerialNumber = ipcSerialNumber; this.ipcSerialNumber = ipcSerialNumber;
this.committedTxId = committedTxId; this.committedTxId = committedTxId;
} }
public String getNameServiceId() {
return nameServiceId;
}
public long getEpoch() { public long getEpoch() {
return epoch; return epoch;
} }

View File

@ -101,7 +101,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
IsFormattedRequestProto request) throws ServiceException { IsFormattedRequestProto request) throws ServiceException {
try { try {
boolean ret = impl.isFormatted( boolean ret = impl.isFormatted(
convert(request.getJid())); convert(request.getJid()),
request.hasNameServiceId() ? request.getNameServiceId() : null);
return IsFormattedResponseProto.newBuilder() return IsFormattedResponseProto.newBuilder()
.setIsFormatted(ret) .setIsFormatted(ret)
.build(); .build();
@ -116,7 +117,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
GetJournalStateRequestProto request) throws ServiceException { GetJournalStateRequestProto request) throws ServiceException {
try { try {
return impl.getJournalState( return impl.getJournalState(
convert(request.getJid())); convert(request.getJid()),
request.hasNameServiceId() ? request.getNameServiceId() : null);
} catch (IOException ioe) { } catch (IOException ioe) {
throw new ServiceException(ioe); throw new ServiceException(ioe);
} }
@ -132,6 +134,7 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
try { try {
return impl.newEpoch( return impl.newEpoch(
request.getJid().getIdentifier(), request.getJid().getIdentifier(),
request.hasNameServiceId() ? request.getNameServiceId() : null,
PBHelper.convert(request.getNsInfo()), PBHelper.convert(request.getNsInfo()),
request.getEpoch()); request.getEpoch());
} catch (IOException ioe) { } catch (IOException ioe) {
@ -143,6 +146,7 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
FormatRequestProto request) throws ServiceException { FormatRequestProto request) throws ServiceException {
try { try {
impl.format(request.getJid().getIdentifier(), impl.format(request.getJid().getIdentifier(),
request.hasNameServiceId() ? request.getNameServiceId() : null,
PBHelper.convert(request.getNsInfo())); PBHelper.convert(request.getNsInfo()));
return FormatResponseProto.getDefaultInstance(); return FormatResponseProto.getDefaultInstance();
} catch (IOException ioe) { } catch (IOException ioe) {
@ -223,6 +227,7 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
try { try {
return impl.getEditLogManifest( return impl.getEditLogManifest(
request.getJid().getIdentifier(), request.getJid().getIdentifier(),
request.hasNameServiceId() ? request.getNameServiceId() : null,
request.getSinceTxId(), request.getSinceTxId(),
request.getInProgressOk()); request.getInProgressOk());
} catch (IOException e) { } catch (IOException e) {
@ -260,6 +265,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
QJournalProtocolProtos.RequestInfoProto reqInfo) { QJournalProtocolProtos.RequestInfoProto reqInfo) {
return new RequestInfo( return new RequestInfo(
reqInfo.getJournalId().getIdentifier(), reqInfo.getJournalId().getIdentifier(),
reqInfo.hasNameServiceId() ?
reqInfo.getNameServiceId() : null,
reqInfo.getEpoch(), reqInfo.getEpoch(),
reqInfo.getIpcSerialNumber(), reqInfo.getIpcSerialNumber(),
reqInfo.hasCommittedTxId() ? reqInfo.hasCommittedTxId() ?
@ -294,7 +301,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
public DoFinalizeResponseProto doFinalize(RpcController controller, public DoFinalizeResponseProto doFinalize(RpcController controller,
DoFinalizeRequestProto request) throws ServiceException { DoFinalizeRequestProto request) throws ServiceException {
try { try {
impl.doFinalize(convert(request.getJid())); impl.doFinalize(convert(request.getJid()),
request.hasNameServiceId() ? request.getNameServiceId() : null);
return DoFinalizeResponseProto.getDefaultInstance(); return DoFinalizeResponseProto.getDefaultInstance();
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
@ -306,7 +314,9 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
CanRollBackRequestProto request) throws ServiceException { CanRollBackRequestProto request) throws ServiceException {
try { try {
StorageInfo si = PBHelper.convert(request.getStorage(), NodeType.JOURNAL_NODE); StorageInfo si = PBHelper.convert(request.getStorage(), NodeType.JOURNAL_NODE);
Boolean result = impl.canRollBack(convert(request.getJid()), si, Boolean result = impl.canRollBack(convert(request.getJid()),
request.hasNameServiceId() ? request.getNameServiceId() : null,
si,
PBHelper.convert(request.getPrevStorage(), NodeType.JOURNAL_NODE), PBHelper.convert(request.getPrevStorage(), NodeType.JOURNAL_NODE),
request.getTargetLayoutVersion()); request.getTargetLayoutVersion());
return CanRollBackResponseProto.newBuilder() return CanRollBackResponseProto.newBuilder()
@ -321,7 +331,7 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
public DoRollbackResponseProto doRollback(RpcController controller, DoRollbackRequestProto request) public DoRollbackResponseProto doRollback(RpcController controller, DoRollbackRequestProto request)
throws ServiceException { throws ServiceException {
try { try {
impl.doRollback(convert(request.getJid())); impl.doRollback(convert(request.getJid()), request.getNameserviceId());
return DoRollbackResponseProto.getDefaultInstance(); return DoRollbackResponseProto.getDefaultInstance();
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
@ -333,7 +343,9 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
RpcController controller, DiscardSegmentsRequestProto request) RpcController controller, DiscardSegmentsRequestProto request)
throws ServiceException { throws ServiceException {
try { try {
impl.discardSegments(convert(request.getJid()), request.getStartTxId()); impl.discardSegments(convert(request.getJid()),
request.hasNameServiceId() ? request.getNameServiceId() : null,
request.getStartTxId());
return DiscardSegmentsResponseProto.getDefaultInstance(); return DiscardSegmentsResponseProto.getDefaultInstance();
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
@ -344,7 +356,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
public GetJournalCTimeResponseProto getJournalCTime(RpcController controller, public GetJournalCTimeResponseProto getJournalCTime(RpcController controller,
GetJournalCTimeRequestProto request) throws ServiceException { GetJournalCTimeRequestProto request) throws ServiceException {
try { try {
Long resultCTime = impl.getJournalCTime(convert(request.getJid())); Long resultCTime = impl.getJournalCTime(convert(request.getJid()),
request.getNameServiceId());
return GetJournalCTimeResponseProto.newBuilder() return GetJournalCTimeResponseProto.newBuilder()
.setResultCTime(resultCTime) .setResultCTime(resultCTime)
.build(); .build();

View File

@ -93,13 +93,17 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
@Override @Override
public boolean isFormatted(String journalId) throws IOException { public boolean isFormatted(String journalId,
String nameServiceId) throws IOException {
try { try {
IsFormattedRequestProto req = IsFormattedRequestProto.newBuilder() IsFormattedRequestProto.Builder req = IsFormattedRequestProto.newBuilder()
.setJid(convertJournalId(journalId)) .setJid(convertJournalId(journalId));
.build(); if (nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
IsFormattedResponseProto resp = rpcProxy.isFormatted( IsFormattedResponseProto resp = rpcProxy.isFormatted(
NULL_CONTROLLER, req); NULL_CONTROLLER, req.build());
return resp.getIsFormatted(); return resp.getIsFormatted();
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
@ -107,13 +111,17 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
} }
@Override @Override
public GetJournalStateResponseProto getJournalState(String jid) public GetJournalStateResponseProto getJournalState(String jid,
String nameServiceId)
throws IOException { throws IOException {
try { try {
GetJournalStateRequestProto req = GetJournalStateRequestProto.newBuilder() GetJournalStateRequestProto.Builder req = GetJournalStateRequestProto
.setJid(convertJournalId(jid)) .newBuilder()
.build(); .setJid(convertJournalId(jid));
return rpcProxy.getJournalState(NULL_CONTROLLER, req); if (nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
return rpcProxy.getJournalState(NULL_CONTROLLER, req.build());
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
@ -126,28 +134,39 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
} }
@Override @Override
public void format(String jid, NamespaceInfo nsInfo) throws IOException { public void format(String jid,
String nameServiceId,
NamespaceInfo nsInfo) throws IOException {
try { try {
FormatRequestProto req = FormatRequestProto.newBuilder() FormatRequestProto.Builder req = FormatRequestProto.newBuilder()
.setJid(convertJournalId(jid)) .setJid(convertJournalId(jid))
.setNsInfo(PBHelper.convert(nsInfo)) .setNsInfo(PBHelper.convert(nsInfo));
.build(); if(nameServiceId != null) {
rpcProxy.format(NULL_CONTROLLER, req); req.setNameServiceId(nameServiceId);
}
rpcProxy.format(NULL_CONTROLLER, req.build());
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
} }
@Override @Override
public NewEpochResponseProto newEpoch(String jid, NamespaceInfo nsInfo, public NewEpochResponseProto newEpoch(String jid,
String nameServiceId,
NamespaceInfo nsInfo,
long epoch) throws IOException { long epoch) throws IOException {
try { try {
NewEpochRequestProto req = NewEpochRequestProto.newBuilder() NewEpochRequestProto.Builder req = NewEpochRequestProto.newBuilder()
.setJid(convertJournalId(jid)) .setJid(convertJournalId(jid))
.setNsInfo(PBHelper.convert(nsInfo)) .setNsInfo(PBHelper.convert(nsInfo))
.setEpoch(epoch) .setEpoch(epoch);
.build();
return rpcProxy.newEpoch(NULL_CONTROLLER, req); if(nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
return rpcProxy.newEpoch(NULL_CONTROLLER, req.build());
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
@ -191,6 +210,9 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
if (reqInfo.hasCommittedTxId()) { if (reqInfo.hasCommittedTxId()) {
builder.setCommittedTxId(reqInfo.getCommittedTxId()); builder.setCommittedTxId(reqInfo.getCommittedTxId());
} }
if(reqInfo.getNameServiceId() != null) {
builder.setNameServiceId(reqInfo.getNameServiceId());
}
return builder.build(); return builder.build();
} }
@ -239,16 +261,21 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
} }
@Override @Override
public GetEditLogManifestResponseProto getEditLogManifest(String jid, public GetEditLogManifestResponseProto getEditLogManifest(
long sinceTxId, boolean inProgressOk) String jid, String nameServiceId,
throws IOException { long sinceTxId, boolean inProgressOk) throws IOException {
try { try {
return rpcProxy.getEditLogManifest(NULL_CONTROLLER, GetEditLogManifestRequestProto.Builder req;
GetEditLogManifestRequestProto.newBuilder() req = GetEditLogManifestRequestProto.newBuilder()
.setJid(convertJournalId(jid)) .setJid(convertJournalId(jid))
.setSinceTxId(sinceTxId) .setSinceTxId(sinceTxId)
.setInProgressOk(inProgressOk) .setInProgressOk(inProgressOk);
.build()); if (nameServiceId !=null) {
req.setNameServiceId(nameServiceId);
}
return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
req.build()
);
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
@ -292,10 +319,10 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
@Override @Override
public void doPreUpgrade(String jid) throws IOException { public void doPreUpgrade(String jid) throws IOException {
try { try {
rpcProxy.doPreUpgrade(NULL_CONTROLLER, DoPreUpgradeRequestProto.Builder req;
DoPreUpgradeRequestProto.newBuilder() req = DoPreUpgradeRequestProto.newBuilder()
.setJid(convertJournalId(jid)) .setJid(convertJournalId(jid));
.build()); rpcProxy.doPreUpgrade(NULL_CONTROLLER, req.build());
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
@ -315,29 +342,37 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
} }
@Override @Override
public void doFinalize(String jid) throws IOException { public void doFinalize(String jid, String nameServiceId) throws IOException {
try { try {
rpcProxy.doFinalize(NULL_CONTROLLER, DoFinalizeRequestProto.Builder req = DoFinalizeRequestProto
DoFinalizeRequestProto.newBuilder() .newBuilder()
.setJid(convertJournalId(jid)) .setJid(convertJournalId(jid));
.build()); if (nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
rpcProxy.doFinalize(NULL_CONTROLLER, req.build());
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
} }
@Override @Override
public Boolean canRollBack(String journalId, StorageInfo storage, public Boolean canRollBack(String journalId,
StorageInfo prevStorage, int targetLayoutVersion) throws IOException { String nameServiceId,
StorageInfo storage,
StorageInfo prevStorage,
int targetLayoutVersion) throws IOException {
try { try {
CanRollBackResponseProto response = rpcProxy.canRollBack( CanRollBackRequestProto.Builder req = CanRollBackRequestProto.newBuilder()
NULL_CONTROLLER,
CanRollBackRequestProto.newBuilder()
.setJid(convertJournalId(journalId)) .setJid(convertJournalId(journalId))
.setStorage(PBHelper.convert(storage)) .setStorage(PBHelper.convert(storage))
.setPrevStorage(PBHelper.convert(prevStorage)) .setPrevStorage(PBHelper.convert(prevStorage))
.setTargetLayoutVersion(targetLayoutVersion) .setTargetLayoutVersion(targetLayoutVersion);
.build()); if (nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
CanRollBackResponseProto response = rpcProxy.canRollBack(
NULL_CONTROLLER, req.build());
return response.getCanRollBack(); return response.getCanRollBack();
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
@ -345,38 +380,53 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
} }
@Override @Override
public void doRollback(String journalId) throws IOException { public void doRollback(String journalId,
String nameServiceId) throws IOException {
try { try {
rpcProxy.doRollback(NULL_CONTROLLER, DoRollbackRequestProto.Builder req = DoRollbackRequestProto.newBuilder()
DoRollbackRequestProto.newBuilder() .setJid(convertJournalId(journalId));
.setJid(convertJournalId(journalId))
.build()); if (nameServiceId != null) {
req.setNameserviceId(nameServiceId);
}
rpcProxy.doRollback(NULL_CONTROLLER, req.build());
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
} }
@Override @Override
public void discardSegments(String journalId, long startTxId) public void discardSegments(String journalId,
String nameServiceId,
long startTxId)
throws IOException { throws IOException {
try { try {
rpcProxy.discardSegments(NULL_CONTROLLER, DiscardSegmentsRequestProto.Builder req = DiscardSegmentsRequestProto
DiscardSegmentsRequestProto.newBuilder() .newBuilder()
.setJid(convertJournalId(journalId)).setStartTxId(startTxId) .setJid(convertJournalId(journalId)).setStartTxId(startTxId);
.build());
if (nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
rpcProxy.discardSegments(NULL_CONTROLLER, req.build());
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
} }
@Override @Override
public Long getJournalCTime(String journalId) throws IOException { public Long getJournalCTime(String journalId,
String nameServiceId) throws IOException {
try { try {
GetJournalCTimeRequestProto.Builder req = GetJournalCTimeRequestProto
.newBuilder()
.setJid(convertJournalId(journalId));
if(nameServiceId !=null) {
req.setNameServiceId(nameServiceId);
}
GetJournalCTimeResponseProto response = rpcProxy.getJournalCTime( GetJournalCTimeResponseProto response = rpcProxy.getJournalCTime(
NULL_CONTROLLER, NULL_CONTROLLER, req.build());
GetJournalCTimeRequestProto.newBuilder()
.setJid(convertJournalId(journalId))
.build());
return response.getResultCTime(); return response.getResultCTime();
} catch (ServiceException e) { } catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);

View File

@ -137,6 +137,11 @@ public class Journal implements Closeable {
private long lastJournalTimestamp = 0; private long lastJournalTimestamp = 0;
// This variable tracks, have we tried to start journalsyncer
// with nameServiceId. This will help not to start the journalsyncer
// on each rpc call, if it has failed to start
private boolean triedJournalSyncerStartedwithnsId = false;
/** /**
* Time threshold for sync calls, beyond which a warning should be logged to the console. * Time threshold for sync calls, beyond which a warning should be logged to the console.
*/ */
@ -160,6 +165,14 @@ public class Journal implements Closeable {
} }
} }
public void setTriedJournalSyncerStartedwithnsId(boolean started) {
this.triedJournalSyncerStartedwithnsId = started;
}
public boolean getTriedJournalSyncerStartedwithnsId() {
return triedJournalSyncerStartedwithnsId;
}
/** /**
* Reload any data that may have been cached. This is necessary * Reload any data that may have been cached. This is necessary
* when we first load the Journal, but also after any formatting * when we first load the Journal, but also after any formatting
@ -660,7 +673,7 @@ public class Journal implements Closeable {
} }
/** /**
* @see QJournalProtocol#getEditLogManifest(String, long, boolean) * @see QJournalProtocol#getEditLogManifest(String, String, long, boolean)
*/ */
public RemoteEditLogManifest getEditLogManifest(long sinceTxId, public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
boolean inProgressOk) throws IOException { boolean inProgressOk) throws IOException {

View File

@ -86,7 +86,9 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
*/ */
private int resultCode = 0; private int resultCode = 0;
synchronized Journal getOrCreateJournal(String jid, StartupOption startOpt) synchronized Journal getOrCreateJournal(String jid,
String nameServiceId,
StartupOption startOpt)
throws IOException { throws IOException {
QuorumJournalManager.checkJournalId(jid); QuorumJournalManager.checkJournalId(jid);
@ -101,22 +103,46 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
if (conf.getBoolean( if (conf.getBoolean(
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY,
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_DEFAULT)) { DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_DEFAULT)) {
startSyncer(journal, jid); startSyncer(journal, jid, nameServiceId);
} }
} else if (journalSyncersById.get(jid) != null &&
!journalSyncersById.get(jid).isJournalSyncerStarted() &&
!journalsById.get(jid).getTriedJournalSyncerStartedwithnsId() &&
nameServiceId != null) {
startSyncer(journal, jid, nameServiceId);
} }
return journal; return journal;
} }
private void startSyncer(Journal journal, String jid) { @VisibleForTesting
JournalNodeSyncer jSyncer = new JournalNodeSyncer(this, journal, jid, conf); public boolean getJournalSyncerStatus(String jid) {
if (journalSyncersById.get(jid) != null) {
return journalSyncersById.get(jid).isJournalSyncerStarted();
} else {
return false;
}
}
private void startSyncer(Journal journal, String jid, String nameServiceId) {
JournalNodeSyncer jSyncer = journalSyncersById.get(jid);
if (jSyncer == null) {
jSyncer = new JournalNodeSyncer(this, journal, jid, conf, nameServiceId);
journalSyncersById.put(jid, jSyncer); journalSyncersById.put(jid, jSyncer);
jSyncer.start(); }
jSyncer.start(nameServiceId);
} }
@VisibleForTesting @VisibleForTesting
public Journal getOrCreateJournal(String jid) throws IOException { public Journal getOrCreateJournal(String jid) throws IOException {
return getOrCreateJournal(jid, StartupOption.REGULAR); return getOrCreateJournal(jid, null, StartupOption.REGULAR);
}
public Journal getOrCreateJournal(String jid,
String nameServiceId)
throws IOException {
return getOrCreateJournal(jid, nameServiceId, StartupOption.REGULAR);
} }
@Override @Override
@ -357,26 +383,40 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
getOrCreateJournal(journalId).doUpgrade(sInfo); getOrCreateJournal(journalId).doUpgrade(sInfo);
} }
public void doFinalize(String journalId) throws IOException { public void doFinalize(String journalId,
getOrCreateJournal(journalId).doFinalize(); String nameServiceId)
throws IOException {
getOrCreateJournal(journalId, nameServiceId).doFinalize();
} }
public Boolean canRollBack(String journalId, StorageInfo storage, public Boolean canRollBack(String journalId, StorageInfo storage,
StorageInfo prevStorage, int targetLayoutVersion) throws IOException { StorageInfo prevStorage, int targetLayoutVersion,
return getOrCreateJournal(journalId, StartupOption.ROLLBACK).canRollBack( String nameServiceId) throws IOException {
return getOrCreateJournal(journalId,
nameServiceId, StartupOption.ROLLBACK).canRollBack(
storage, prevStorage, targetLayoutVersion); storage, prevStorage, targetLayoutVersion);
} }
public void doRollback(String journalId) throws IOException { public void doRollback(String journalId,
getOrCreateJournal(journalId, StartupOption.ROLLBACK).doRollback(); String nameServiceId) throws IOException {
getOrCreateJournal(journalId,
nameServiceId, StartupOption.ROLLBACK).doRollback();
} }
public void discardSegments(String journalId, long startTxId) public void discardSegments(String journalId, long startTxId,
String nameServiceId)
throws IOException { throws IOException {
getOrCreateJournal(journalId).discardSegments(startTxId); getOrCreateJournal(journalId, nameServiceId).discardSegments(startTxId);
} }
public Long getJournalCTime(String journalId) throws IOException { public Long getJournalCTime(String journalId,
return getOrCreateJournal(journalId).getJournalCTime(); String nameServiceId) throws IOException {
return getOrCreateJournal(journalId, nameServiceId).getJournalCTime();
} }
@VisibleForTesting
public Journal getJournal(String jid) {
return journalsById.get(jid);
}
} }

View File

@ -49,10 +49,10 @@ import org.apache.hadoop.net.NetUtils;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
@InterfaceAudience.Private @InterfaceAudience.Private
@VisibleForTesting @VisibleForTesting
public class JournalNodeRpcServer implements QJournalProtocol { public class JournalNodeRpcServer implements QJournalProtocol {
private static final int HANDLER_COUNT = 5; private static final int HANDLER_COUNT = 5;
private final JournalNode jn; private final JournalNode jn;
private Server server; private Server server;
@ -117,15 +117,18 @@ public class JournalNodeRpcServer implements QJournalProtocol {
} }
@Override @Override
public boolean isFormatted(String journalId) throws IOException { public boolean isFormatted(String journalId,
return jn.getOrCreateJournal(journalId).isFormatted(); String nameServiceId) throws IOException {
return jn.getOrCreateJournal(journalId, nameServiceId).isFormatted();
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Override @Override
public GetJournalStateResponseProto getJournalState(String journalId) public GetJournalStateResponseProto getJournalState(String journalId,
String nameServiceId)
throws IOException { throws IOException {
long epoch = jn.getOrCreateJournal(journalId).getLastPromisedEpoch(); long epoch = jn.getOrCreateJournal(journalId,
nameServiceId).getLastPromisedEpoch();
return GetJournalStateResponseProto.newBuilder() return GetJournalStateResponseProto.newBuilder()
.setLastPromisedEpoch(epoch) .setLastPromisedEpoch(epoch)
.setHttpPort(jn.getBoundHttpAddress().getPort()) .setHttpPort(jn.getBoundHttpAddress().getPort())
@ -135,59 +138,64 @@ public class JournalNodeRpcServer implements QJournalProtocol {
@Override @Override
public NewEpochResponseProto newEpoch(String journalId, public NewEpochResponseProto newEpoch(String journalId,
String nameServiceId,
NamespaceInfo nsInfo, NamespaceInfo nsInfo,
long epoch) throws IOException { long epoch) throws IOException {
return jn.getOrCreateJournal(journalId).newEpoch(nsInfo, epoch); return jn.getOrCreateJournal(journalId,
nameServiceId).newEpoch(nsInfo, epoch);
} }
@Override @Override
public void format(String journalId, NamespaceInfo nsInfo) public void format(String journalId,
String nameServiceId,
NamespaceInfo nsInfo)
throws IOException { throws IOException {
jn.getOrCreateJournal(journalId).format(nsInfo); jn.getOrCreateJournal(journalId, nameServiceId).format(nsInfo);
} }
@Override @Override
public void journal(RequestInfo reqInfo, public void journal(RequestInfo reqInfo,
long segmentTxId, long firstTxnId, long segmentTxId, long firstTxnId,
int numTxns, byte[] records) throws IOException { int numTxns, byte[] records) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId()) jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.journal(reqInfo, segmentTxId, firstTxnId, numTxns, records); .journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
} }
@Override @Override
public void heartbeat(RequestInfo reqInfo) throws IOException { public void heartbeat(RequestInfo reqInfo) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId()) jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.heartbeat(reqInfo); .heartbeat(reqInfo);
} }
@Override @Override
public void startLogSegment(RequestInfo reqInfo, long txid, int layoutVersion) public void startLogSegment(RequestInfo reqInfo, long txid, int layoutVersion)
throws IOException { throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId()) jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.startLogSegment(reqInfo, txid, layoutVersion); .startLogSegment(reqInfo, txid, layoutVersion);
} }
@Override @Override
public void finalizeLogSegment(RequestInfo reqInfo, long startTxId, public void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
long endTxId) throws IOException { long endTxId) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId()) jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.finalizeLogSegment(reqInfo, startTxId, endTxId); .finalizeLogSegment(reqInfo, startTxId, endTxId);
} }
@Override @Override
public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep) public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
throws IOException { throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId()) jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.purgeLogsOlderThan(reqInfo, minTxIdToKeep); .purgeLogsOlderThan(reqInfo, minTxIdToKeep);
} }
@SuppressWarnings("deprecation") @SuppressWarnings("deprecation")
@Override @Override
public GetEditLogManifestResponseProto getEditLogManifest(String jid, public GetEditLogManifestResponseProto getEditLogManifest(
String jid, String nameServiceId,
long sinceTxId, boolean inProgressOk) long sinceTxId, boolean inProgressOk)
throws IOException { throws IOException {
RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid) RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid, nameServiceId)
.getEditLogManifest(sinceTxId, inProgressOk); .getEditLogManifest(sinceTxId, inProgressOk);
return GetEditLogManifestResponseProto.newBuilder() return GetEditLogManifestResponseProto.newBuilder()
@ -200,14 +208,15 @@ public class JournalNodeRpcServer implements QJournalProtocol {
@Override @Override
public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo, public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
long segmentTxId) throws IOException { long segmentTxId) throws IOException {
return jn.getOrCreateJournal(reqInfo.getJournalId()) return jn.getOrCreateJournal(reqInfo.getJournalId(),
reqInfo.getNameServiceId())
.prepareRecovery(reqInfo, segmentTxId); .prepareRecovery(reqInfo, segmentTxId);
} }
@Override @Override
public void acceptRecovery(RequestInfo reqInfo, SegmentStateProto log, public void acceptRecovery(RequestInfo reqInfo, SegmentStateProto log,
URL fromUrl) throws IOException { URL fromUrl) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId()) jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.acceptRecovery(reqInfo, log, fromUrl); .acceptRecovery(reqInfo, log, fromUrl);
} }
@ -222,30 +231,36 @@ public class JournalNodeRpcServer implements QJournalProtocol {
} }
@Override @Override
public void doFinalize(String journalId) throws IOException { public void doFinalize(String journalId,
jn.doFinalize(journalId); String nameServiceId) throws IOException {
jn.doFinalize(journalId, nameServiceId);
} }
@Override @Override
public Boolean canRollBack(String journalId, StorageInfo storage, public Boolean canRollBack(String journalId,
String nameServiceId, StorageInfo storage,
StorageInfo prevStorage, int targetLayoutVersion) StorageInfo prevStorage, int targetLayoutVersion)
throws IOException { throws IOException {
return jn.canRollBack(journalId, storage, prevStorage, targetLayoutVersion); return jn.canRollBack(journalId, storage, prevStorage, targetLayoutVersion,
nameServiceId);
} }
@Override @Override
public void doRollback(String journalId) throws IOException { public void doRollback(String journalId,
jn.doRollback(journalId); String nameServiceId) throws IOException {
jn.doRollback(journalId, nameServiceId);
} }
@Override @Override
public void discardSegments(String journalId, long startTxId) public void discardSegments(String journalId,
String nameServiceId, long startTxId)
throws IOException { throws IOException {
jn.discardSegments(journalId, startTxId); jn.discardSegments(journalId, startTxId, nameServiceId);
} }
@Override @Override
public Long getJournalCTime(String journalId) throws IOException { public Long getJournalCTime(String journalId,
return jn.getJournalCTime(journalId); String nameServiceId) throws IOException {
return jn.getJournalCTime(journalId, nameServiceId);
} }
} }

View File

@ -65,6 +65,7 @@ public class JournalNodeSyncer {
private final JournalNode jn; private final JournalNode jn;
private final Journal journal; private final Journal journal;
private final String jid; private final String jid;
private String nameServiceId;
private final JournalIdProto jidProto; private final JournalIdProto jidProto;
private final JNStorage jnStorage; private final JNStorage jnStorage;
private final Configuration conf; private final Configuration conf;
@ -78,12 +79,14 @@ public class JournalNodeSyncer {
private final int logSegmentTransferTimeout; private final int logSegmentTransferTimeout;
private final DataTransferThrottler throttler; private final DataTransferThrottler throttler;
private final JournalMetrics metrics; private final JournalMetrics metrics;
private boolean journalSyncerStarted;
JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid, JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid,
Configuration conf) { Configuration conf, String nameServiceId) {
this.jn = jouranlNode; this.jn = jouranlNode;
this.journal = journal; this.journal = journal;
this.jid = jid; this.jid = jid;
this.nameServiceId = nameServiceId;
this.jidProto = convertJournalId(this.jid); this.jidProto = convertJournalId(this.jid);
this.jnStorage = journal.getStorage(); this.jnStorage = journal.getStorage();
this.conf = conf; this.conf = conf;
@ -95,6 +98,7 @@ public class JournalNodeSyncer {
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT); DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT);
throttler = getThrottler(conf); throttler = getThrottler(conf);
metrics = journal.getMetrics(); metrics = journal.getMetrics();
journalSyncerStarted = false;
} }
void stopSync() { void stopSync() {
@ -109,13 +113,21 @@ public class JournalNodeSyncer {
} }
} }
public void start() { public void start(String nsId) {
LOG.info("Starting SyncJournal daemon for journal " + jid); if (nsId != null) {
if (getOtherJournalNodeProxies()) { this.nameServiceId = nsId;
startSyncJournalsDaemon(); journal.setTriedJournalSyncerStartedwithnsId(true);
} else {
LOG.warn("Failed to start SyncJournal daemon for journal " + jid);
} }
if (!journalSyncerStarted && getOtherJournalNodeProxies()) {
LOG.info("Starting SyncJournal daemon for journal " + jid);
startSyncJournalsDaemon();
journalSyncerStarted = true;
}
}
public boolean isJournalSyncerStarted() {
return journalSyncerStarted;
} }
private boolean createEditsSyncDir() { private boolean createEditsSyncDir() {

View File

@ -1802,6 +1802,14 @@ public class FSEditLog implements LogsPurgeable {
Class<? extends JournalManager> clazz Class<? extends JournalManager> clazz
= getJournalClass(conf, uri.getScheme()); = getJournalClass(conf, uri.getScheme());
try {
Constructor<? extends JournalManager> cons
= clazz.getConstructor(Configuration.class, URI.class,
NamespaceInfo.class, String.class);
String nameServiceId = conf.get(DFSConfigKeys.DFS_NAMESERVICE_ID);
return cons.newInstance(conf, uri, storage.getNamespaceInfo(),
nameServiceId);
} catch (NoSuchMethodException ne) {
try { try {
Constructor<? extends JournalManager> cons Constructor<? extends JournalManager> cons
= clazz.getConstructor(Configuration.class, URI.class, = clazz.getConstructor(Configuration.class, URI.class,
@ -1811,6 +1819,10 @@ public class FSEditLog implements LogsPurgeable {
throw new IllegalArgumentException("Unable to construct journal, " throw new IllegalArgumentException("Unable to construct journal, "
+ uri, e); + uri, e);
} }
} catch (Exception e) {
throw new IllegalArgumentException("Unable to construct journal, "
+ uri, e);
}
} }
@VisibleForTesting @VisibleForTesting

View File

@ -46,6 +46,7 @@ message RequestInfoProto {
// request itself, eg in the case that the node has // request itself, eg in the case that the node has
// fallen behind. // fallen behind.
optional uint64 committedTxId = 4; optional uint64 committedTxId = 4;
optional string nameServiceId = 5;
} }
message SegmentStateProto { message SegmentStateProto {
@ -73,6 +74,7 @@ message JournalRequestProto {
required uint32 numTxns = 3; required uint32 numTxns = 3;
required bytes records = 4; required bytes records = 4;
required uint64 segmentTxnId = 5; required uint64 segmentTxnId = 5;
optional string nameServiceId = 6;
} }
message JournalResponseProto { message JournalResponseProto {
@ -129,6 +131,7 @@ message PurgeLogsResponseProto {
*/ */
message IsFormattedRequestProto { message IsFormattedRequestProto {
required JournalIdProto jid = 1; required JournalIdProto jid = 1;
optional string nameServiceId = 2;
} }
message IsFormattedResponseProto { message IsFormattedResponseProto {
@ -140,6 +143,7 @@ message IsFormattedResponseProto {
*/ */
message GetJournalCTimeRequestProto { message GetJournalCTimeRequestProto {
required JournalIdProto jid = 1; required JournalIdProto jid = 1;
optional string nameServiceId = 2;
} }
message GetJournalCTimeResponseProto { message GetJournalCTimeResponseProto {
@ -172,6 +176,7 @@ message DoUpgradeResponseProto {
*/ */
message DoFinalizeRequestProto { message DoFinalizeRequestProto {
required JournalIdProto jid = 1; required JournalIdProto jid = 1;
optional string nameServiceId = 2;
} }
message DoFinalizeResponseProto { message DoFinalizeResponseProto {
@ -185,6 +190,7 @@ message CanRollBackRequestProto {
required StorageInfoProto storage = 2; required StorageInfoProto storage = 2;
required StorageInfoProto prevStorage = 3; required StorageInfoProto prevStorage = 3;
required int32 targetLayoutVersion = 4; required int32 targetLayoutVersion = 4;
optional string nameServiceId = 5;
} }
message CanRollBackResponseProto { message CanRollBackResponseProto {
@ -196,6 +202,7 @@ message CanRollBackResponseProto {
*/ */
message DoRollbackRequestProto { message DoRollbackRequestProto {
required JournalIdProto jid = 1; required JournalIdProto jid = 1;
optional string nameserviceId = 2;
} }
message DoRollbackResponseProto { message DoRollbackResponseProto {
@ -207,6 +214,7 @@ message DoRollbackResponseProto {
message DiscardSegmentsRequestProto { message DiscardSegmentsRequestProto {
required JournalIdProto jid = 1; required JournalIdProto jid = 1;
required uint64 startTxId = 2; required uint64 startTxId = 2;
optional string nameServiceId = 3;
} }
message DiscardSegmentsResponseProto { message DiscardSegmentsResponseProto {
@ -217,6 +225,7 @@ message DiscardSegmentsResponseProto {
*/ */
message GetJournalStateRequestProto { message GetJournalStateRequestProto {
required JournalIdProto jid = 1; required JournalIdProto jid = 1;
optional string nameServiceId = 2;
} }
message GetJournalStateResponseProto { message GetJournalStateResponseProto {
@ -232,6 +241,7 @@ message GetJournalStateResponseProto {
message FormatRequestProto { message FormatRequestProto {
required JournalIdProto jid = 1; required JournalIdProto jid = 1;
required NamespaceInfoProto nsInfo = 2; required NamespaceInfoProto nsInfo = 2;
optional string nameServiceId = 3;
} }
message FormatResponseProto { message FormatResponseProto {
@ -244,6 +254,7 @@ message NewEpochRequestProto {
required JournalIdProto jid = 1; required JournalIdProto jid = 1;
required NamespaceInfoProto nsInfo = 2; required NamespaceInfoProto nsInfo = 2;
required uint64 epoch = 3; required uint64 epoch = 3;
optional string nameServiceId = 4;
} }
message NewEpochResponseProto { message NewEpochResponseProto {
@ -259,6 +270,7 @@ message GetEditLogManifestRequestProto {
// Whether or not the client will be reading from the returned streams. // Whether or not the client will be reading from the returned streams.
// optional bool forReading = 3 [default = true]; <obsolete, do not reuse> // optional bool forReading = 3 [default = true]; <obsolete, do not reuse>
optional bool inProgressOk = 4 [default = false]; optional bool inProgressOk = 4 [default = false];
optional string nameServiceId = 5;
} }
message GetEditLogManifestResponseProto { message GetEditLogManifestResponseProto {

View File

@ -107,9 +107,9 @@ public class TestEpochsAreUnique {
private class FaultyLoggerFactory implements AsyncLogger.Factory { private class FaultyLoggerFactory implements AsyncLogger.Factory {
@Override @Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) { String journalId, String nameServiceId, InetSocketAddress addr) {
AsyncLogger ch = IPCLoggerChannel.FACTORY.createLogger( AsyncLogger ch = IPCLoggerChannel.FACTORY.createLogger(
conf, nsInfo, journalId, addr); conf, nsInfo, journalId, nameServiceId, addr);
AsyncLogger spy = Mockito.spy(ch); AsyncLogger spy = Mockito.spy(ch);
Mockito.doAnswer(new SometimesFaulty<Long>(0.10f)) Mockito.doAnswer(new SometimesFaulty<Long>(0.10f))
.when(spy).getJournalState(); .when(spy).getJournalState();

View File

@ -61,7 +61,6 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
@ -440,9 +439,14 @@ public class TestQJMWithFaults {
new WrapEveryCall<Object>(realProxy) { new WrapEveryCall<Object>(realProxy) {
void beforeCall(InvocationOnMock invocation) throws Exception { void beforeCall(InvocationOnMock invocation) throws Exception {
rpcCount++; rpcCount++;
String param="";
for (Object val : invocation.getArguments()) {
param += val +",";
}
String callStr = "[" + addr + "] " + String callStr = "[" + addr + "] " +
invocation.getMethod().getName() + "(" + invocation.getMethod().getName() + "(" +
Joiner.on(", ").join(invocation.getArguments()) + ")"; param + ")";
Callable<Void> inject = injections.get(rpcCount); Callable<Void> inject = injections.get(rpcCount);
if (inject != null) { if (inject != null) {
@ -505,7 +509,7 @@ public class TestQJMWithFaults {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() { AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override @Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) { String journalId, String nameserviceId, InetSocketAddress addr) {
return new InvocationCountingChannel(conf, nsInfo, journalId, addr); return new InvocationCountingChannel(conf, nsInfo, journalId, addr);
} }
}; };
@ -520,7 +524,7 @@ public class TestQJMWithFaults {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() { AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override @Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) { String journalId, String nameServiceId, InetSocketAddress addr) {
return new RandomFaultyChannel(conf, nsInfo, journalId, addr, return new RandomFaultyChannel(conf, nsInfo, journalId, addr,
seedGenerator.nextLong()); seedGenerator.nextLong());
} }

View File

@ -940,8 +940,9 @@ public class TestQuorumJournalManager {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() { AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override @Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo, public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) { String journalId, String nameServiceId, InetSocketAddress addr) {
AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) { AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId,
nameServiceId, addr) {
protected ExecutorService createSingleThreadExecutor() { protected ExecutorService createSingleThreadExecutor() {
// Don't parallelize calls to the quorum in the tests. // Don't parallelize calls to the quorum in the tests.
// This makes the tests more deterministic. // This makes the tests more deterministic.

View File

@ -156,12 +156,12 @@ public class TestJournal {
journal.startLogSegment(makeRI(1), 1, journal.startLogSegment(makeRI(1), 1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
// Send txids 1-3, with a request indicating only 0 committed // Send txids 1-3, with a request indicating only 0 committed
journal.journal(new RequestInfo(JID, 1, 2, 0), 1, 1, 3, journal.journal(new RequestInfo(JID, null, 1, 2, 0), 1, 1, 3,
QJMTestUtil.createTxnData(1, 3)); QJMTestUtil.createTxnData(1, 3));
assertEquals(0, journal.getCommittedTxnId()); assertEquals(0, journal.getCommittedTxnId());
// Send 4-6, with request indicating that through 3 is committed. // Send 4-6, with request indicating that through 3 is committed.
journal.journal(new RequestInfo(JID, 1, 3, 3), 1, 4, 3, journal.journal(new RequestInfo(JID, null, 1, 3, 3), 1, 4, 3,
QJMTestUtil.createTxnData(4, 6)); QJMTestUtil.createTxnData(4, 6));
assertEquals(3, journal.getCommittedTxnId()); assertEquals(3, journal.getCommittedTxnId());
} }
@ -195,7 +195,7 @@ public class TestJournal {
@Test (timeout = 10000) @Test (timeout = 10000)
public void testFormatResetsCachedValues() throws Exception { public void testFormatResetsCachedValues() throws Exception {
journal.newEpoch(FAKE_NSINFO, 12345L); journal.newEpoch(FAKE_NSINFO, 12345L);
journal.startLogSegment(new RequestInfo(JID, 12345L, 1L, 0L), 1L, journal.startLogSegment(new RequestInfo(JID, null, 12345L, 1L, 0L), 1L,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION); NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
assertEquals(12345L, journal.getLastPromisedEpoch()); assertEquals(12345L, journal.getLastPromisedEpoch());
@ -404,7 +404,7 @@ public class TestJournal {
} }
private static RequestInfo makeRI(int serial) { private static RequestInfo makeRI(int serial) {
return new RequestInfo(JID, 1, serial, 0); return new RequestInfo(JID, null, 1, serial, 0);
} }
@Test (timeout = 10000) @Test (timeout = 10000)

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.io.File; import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection; import java.net.HttpURLConnection;
import java.net.URL; import java.net.URL;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
@ -43,24 +44,30 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts; import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.test.PathUtils; import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StopWatch; import org.apache.hadoop.util.StopWatch;
import org.junit.After; import org.junit.After;
import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import com.google.common.base.Charsets; import com.google.common.base.Charsets;
import com.google.common.primitives.Bytes; import com.google.common.primitives.Bytes;
import com.google.common.primitives.Ints; import com.google.common.primitives.Ints;
import org.junit.rules.TestName;
import org.mockito.Mockito; import org.mockito.Mockito;
public class TestJournalNode { public class TestJournalNode {
private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo( private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
12345, "mycluster", "my-bp", 0L); 12345, "mycluster", "my-bp", 0L);
@Rule
public TestName testName = new TestName();
private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestJournalNode.class); private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestJournalNode.class);
@ -85,6 +92,21 @@ public class TestJournalNode {
editsDir.getAbsolutePath()); editsDir.getAbsolutePath());
conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY, conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
"0.0.0.0:0"); "0.0.0.0:0");
if (testName.getMethodName().equals(
"testJournalNodeSyncerNotStartWhenSyncDisabled")) {
conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY,
false);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
"qjournal://jn0:9900;jn1:9901");
} else if (testName.getMethodName().equals(
"testJournalNodeSyncerNotStartWhenSyncEnabledIncorrectURI")) {
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
"qjournal://journal0\\:9900;journal1:9901");
} else if (testName.getMethodName().equals(
"testJournalNodeSyncerNotStartWhenSyncEnabled")) {
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
"qjournal://jn0:9900;jn1:9901");
}
jn = new JournalNode(); jn = new JournalNode();
jn.setConf(conf); jn.setConf(conf);
jn.start(); jn.start();
@ -363,4 +385,79 @@ public class TestJournalNode {
Mockito.verify(jNode).stop(1); Mockito.verify(jNode).stop(1);
} }
@Test
public void testJournalNodeSyncerNotStartWhenSyncDisabled()
throws IOException{
//JournalSyncer will not be started, as journalsync is not enabled
conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, false);
jn.getOrCreateJournal(journalId);
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(false,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
//Trying by passing nameserviceId still journalnodesyncer should not start
// IstriedJournalSyncerStartWithnsId should also be false
jn.getOrCreateJournal(journalId, "mycluster");
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(false,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
}
@Test
public void testJournalNodeSyncerNotStartWhenSyncEnabledIncorrectURI()
throws IOException{
//JournalSyncer will not be started,
// as shared edits hostnames are not resolved
jn.getOrCreateJournal(journalId);
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(false,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
//Trying by passing nameserviceId, now
// IstriedJournalSyncerStartWithnsId should be set
// but journalnode syncer will not be started,
// as hostnames are not resolved
jn.getOrCreateJournal(journalId, "mycluster");
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(true,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
}
@Test
public void testJournalNodeSyncerNotStartWhenSyncEnabled()
throws IOException{
//JournalSyncer will not be started,
// as shared edits hostnames are not resolved
jn.getOrCreateJournal(journalId);
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(false,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
//Trying by passing nameserviceId and resolve hostnames
// now IstriedJournalSyncerStartWithnsId should be set
// and also journalnode syncer will also be started
setupStaticHostResolution(2, "jn");
jn.getOrCreateJournal(journalId, "mycluster");
Assert.assertEquals(true,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(true,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
}
private void setupStaticHostResolution(int nameServiceIdCount,
String hostname) {
for (int i = 0; i < nameServiceIdCount; i++) {
NetUtils.addStaticResolution(hostname + i,
"localhost");
}
}
} }

View File

@ -96,6 +96,13 @@ public class TestJournalNodeSync {
@Test(timeout=30000) @Test(timeout=30000)
public void testJournalNodeSync() throws Exception { public void testJournalNodeSync() throws Exception {
//As by default 3 journal nodes are started;
for(int i=0; i<3; i++) {
Assert.assertEquals(true,
jCluster.getJournalNode(i).getJournalSyncerStatus("ns1"));
}
File firstJournalDir = jCluster.getJournalDir(0, jid); File firstJournalDir = jCluster.getJournalDir(0, jid);
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir) File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
.getCurrentDir(); .getCurrentDir();