HDFS-12553. Add nameServiceId to QJournalProtocol. Contributed by Bharat Viswanadham

This commit is contained in:
Arpit Agarwal 2017-10-13 14:22:21 -07:00
parent b7cef4e934
commit 4d19e39fd7
19 changed files with 505 additions and 178 deletions

View File

@ -49,7 +49,7 @@ interface AsyncLogger {
interface Factory {
AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr);
String journalId, String nameServiceId, InetSocketAddress addr);
}
/**

View File

@ -95,6 +95,8 @@ public class IPCLoggerChannel implements AsyncLogger {
private long committedTxId = HdfsServerConstants.INVALID_TXID;
private final String journalId;
private final String nameServiceId;
private final NamespaceInfo nsInfo;
private URL httpServerURL;
@ -152,8 +154,8 @@ public class IPCLoggerChannel implements AsyncLogger {
static final Factory FACTORY = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
return new IPCLoggerChannel(conf, nsInfo, journalId, addr);
String journalId, String nameServiceId, InetSocketAddress addr) {
return new IPCLoggerChannel(conf, nsInfo, journalId, nameServiceId, addr);
}
};
@ -161,11 +163,19 @@ public class IPCLoggerChannel implements AsyncLogger {
NamespaceInfo nsInfo,
String journalId,
InetSocketAddress addr) {
this(conf, nsInfo, journalId, null, addr);
}
public IPCLoggerChannel(Configuration conf,
NamespaceInfo nsInfo,
String journalId,
String nameServiceId,
InetSocketAddress addr) {
this.conf = conf;
this.nsInfo = nsInfo;
this.journalId = journalId;
this.nameServiceId = nameServiceId;
this.addr = addr;
this.queueSizeLimitBytes = 1024 * 1024 * conf.getInt(
DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_KEY,
DFSConfigKeys.DFS_QJOURNAL_QUEUE_SIZE_LIMIT_DEFAULT);
@ -286,7 +296,8 @@ public class IPCLoggerChannel implements AsyncLogger {
private synchronized RequestInfo createReqInfo() {
Preconditions.checkState(epoch > 0, "bad epoch: " + epoch);
return new RequestInfo(journalId, epoch, ipcSerial++, committedTxId);
return new RequestInfo(journalId, nameServiceId,
epoch, ipcSerial++, committedTxId);
}
@VisibleForTesting
@ -330,7 +341,7 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws IOException {
return getProxy().isFormatted(journalId);
return getProxy().isFormatted(journalId, nameServiceId);
}
});
}
@ -341,7 +352,7 @@ public class IPCLoggerChannel implements AsyncLogger {
@Override
public GetJournalStateResponseProto call() throws IOException {
GetJournalStateResponseProto ret =
getProxy().getJournalState(journalId);
getProxy().getJournalState(journalId, nameServiceId);
constructHttpServerURI(ret);
return ret;
}
@ -354,7 +365,7 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<NewEpochResponseProto>() {
@Override
public NewEpochResponseProto call() throws IOException {
return getProxy().newEpoch(journalId, nsInfo, epoch);
return getProxy().newEpoch(journalId, nameServiceId, nsInfo, epoch);
}
});
}
@ -495,7 +506,7 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
getProxy().format(journalId, nsInfo);
getProxy().format(journalId, nameServiceId, nsInfo);
return null;
}
});
@ -554,7 +565,7 @@ public class IPCLoggerChannel implements AsyncLogger {
@Override
public RemoteEditLogManifest call() throws IOException {
GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
journalId, fromTxnId, inProgressOk);
journalId, nameServiceId, fromTxnId, inProgressOk);
// Update the http port, since we need this to build URLs to any of the
// returned logs.
constructHttpServerURI(ret);
@ -573,7 +584,7 @@ public class IPCLoggerChannel implements AsyncLogger {
// force an RPC call so we know what the HTTP port should be if it
// haven't done so.
GetJournalStateResponseProto ret = getProxy().getJournalState(
journalId);
journalId, nameServiceId);
constructHttpServerURI(ret);
}
return getProxy().prepareRecovery(createReqInfo(), segmentTxId);
@ -620,7 +631,7 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().doFinalize(journalId);
getProxy().doFinalize(journalId, nameServiceId);
return null;
}
});
@ -632,8 +643,8 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<Boolean>() {
@Override
public Boolean call() throws IOException {
return getProxy().canRollBack(journalId, storage, prevStorage,
targetLayoutVersion);
return getProxy().canRollBack(journalId, nameServiceId,
storage, prevStorage, targetLayoutVersion);
}
});
}
@ -643,7 +654,7 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().doRollback(journalId);
getProxy().doRollback(journalId, nameServiceId);
return null;
}
});
@ -654,7 +665,7 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException {
getProxy().discardSegments(journalId, startTxId);
getProxy().discardSegments(journalId, nameServiceId, startTxId);
return null;
}
});
@ -665,7 +676,7 @@ public class IPCLoggerChannel implements AsyncLogger {
return singleThreadExecutor.submit(new Callable<Long>() {
@Override
public Long call() throws IOException {
return getProxy().getJournalCTime(journalId);
return getProxy().getJournalCTime(journalId, nameServiceId);
}
});
}

View File

@ -85,26 +85,44 @@ public class QuorumJournalManager implements JournalManager {
private final Configuration conf;
private final URI uri;
private final NamespaceInfo nsInfo;
private final String nameServiceId;
private boolean isActiveWriter;
private final AsyncLoggerSet loggers;
private int outputBufferCapacity = 512 * 1024;
private final URLConnectionFactory connectionFactory;
@VisibleForTesting
public QuorumJournalManager(Configuration conf,
URI uri, NamespaceInfo nsInfo) throws IOException {
this(conf, uri, nsInfo, IPCLoggerChannel.FACTORY);
URI uri,
NamespaceInfo nsInfo) throws IOException {
this(conf, uri, nsInfo, null, IPCLoggerChannel.FACTORY);
}
public QuorumJournalManager(Configuration conf,
URI uri, NamespaceInfo nsInfo, String nameServiceId) throws IOException {
this(conf, uri, nsInfo, nameServiceId, IPCLoggerChannel.FACTORY);
}
@VisibleForTesting
QuorumJournalManager(Configuration conf,
URI uri, NamespaceInfo nsInfo,
URI uri, NamespaceInfo nsInfo,
AsyncLogger.Factory loggerFactory) throws IOException {
this(conf, uri, nsInfo, null, loggerFactory);
}
QuorumJournalManager(Configuration conf,
URI uri, NamespaceInfo nsInfo, String nameServiceId,
AsyncLogger.Factory loggerFactory) throws IOException {
Preconditions.checkArgument(conf != null, "must be configured");
this.conf = conf;
this.uri = uri;
this.nsInfo = nsInfo;
this.nameServiceId = nameServiceId;
this.loggers = new AsyncLoggerSet(createLoggers(loggerFactory));
this.connectionFactory = URLConnectionFactory
.newDefaultURLConnectionFactory(conf);
@ -142,7 +160,7 @@ public class QuorumJournalManager implements JournalManager {
protected List<AsyncLogger> createLoggers(
AsyncLogger.Factory factory) throws IOException {
return createLoggers(conf, uri, nsInfo, factory);
return createLoggers(conf, uri, nsInfo, factory, nameServiceId);
}
static String parseJournalId(URI uri) {
@ -354,8 +372,11 @@ public class QuorumJournalManager implements JournalManager {
}
static List<AsyncLogger> createLoggers(Configuration conf,
URI uri, NamespaceInfo nsInfo, AsyncLogger.Factory factory)
throws IOException {
URI uri,
NamespaceInfo nsInfo,
AsyncLogger.Factory factory,
String nameServiceId)
throws IOException {
List<AsyncLogger> ret = Lists.newArrayList();
List<InetSocketAddress> addrs = Util.getAddressesList(uri);
if (addrs.size() % 2 == 0) {
@ -364,7 +385,7 @@ public class QuorumJournalManager implements JournalManager {
}
String jid = parseJournalId(uri);
for (InetSocketAddress addr : addrs) {
ret.add(factory.createLogger(conf, nsInfo, jid, addr));
ret.add(factory.createLogger(conf, nsInfo, jid, nameServiceId, addr));
}
return ret;
}

View File

@ -53,26 +53,30 @@ public interface QJournalProtocol {
* @return true if the given journal has been formatted and
* contains valid data.
*/
public boolean isFormatted(String journalId) throws IOException;
boolean isFormatted(String journalId,
String nameServiceId) throws IOException;
/**
* Get the current state of the journal, including the most recent
* epoch number and the HTTP port.
*/
public GetJournalStateResponseProto getJournalState(String journalId)
GetJournalStateResponseProto getJournalState(String journalId,
String nameServiceId)
throws IOException;
/**
* Format the underlying storage for the given namespace.
*/
public void format(String journalId,
void format(String journalId, String nameServiceId,
NamespaceInfo nsInfo) throws IOException;
/**
* Begin a new epoch. See the HDFS-3077 design doc for details.
*/
public NewEpochResponseProto newEpoch(String journalId,
NamespaceInfo nsInfo, long epoch) throws IOException;
NewEpochResponseProto newEpoch(String journalId,
String nameServiceId,
NamespaceInfo nsInfo,
long epoch) throws IOException;
/**
* Journal edit records.
@ -130,8 +134,10 @@ public interface QJournalProtocol {
* segment
* @return a list of edit log segments since the given transaction ID.
*/
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
long sinceTxId, boolean inProgressOk)
GetEditLogManifestResponseProto getEditLogManifest(String jid,
String nameServiceId,
long sinceTxId,
boolean inProgressOk)
throws IOException;
/**
@ -147,24 +153,30 @@ public interface QJournalProtocol {
public void acceptRecovery(RequestInfo reqInfo,
SegmentStateProto stateToAccept, URL fromUrl) throws IOException;
public void doPreUpgrade(String journalId) throws IOException;
void doPreUpgrade(String journalId) throws IOException;
public void doUpgrade(String journalId, StorageInfo sInfo) throws IOException;
public void doFinalize(String journalId) throws IOException;
void doFinalize(String journalId,
String nameServiceid) throws IOException;
public Boolean canRollBack(String journalId, StorageInfo storage,
StorageInfo prevStorage, int targetLayoutVersion) throws IOException;
Boolean canRollBack(String journalId, String nameServiceid,
StorageInfo storage, StorageInfo prevStorage,
int targetLayoutVersion) throws IOException;
public void doRollback(String journalId) throws IOException;
void doRollback(String journalId,
String nameServiceid) throws IOException;
/**
* Discard journal segments whose first TxId is greater than or equal to the
* given txid.
*/
@Idempotent
public void discardSegments(String journalId, long startTxId)
void discardSegments(String journalId,
String nameServiceId,
long startTxId)
throws IOException;
public Long getJournalCTime(String journalId) throws IOException;
Long getJournalCTime(String journalId,
String nameServiceId) throws IOException;
}

View File

@ -26,15 +26,22 @@ public class RequestInfo {
private long epoch;
private long ipcSerialNumber;
private final long committedTxId;
private final String nameServiceId;
public RequestInfo(String jid, long epoch, long ipcSerialNumber,
long committedTxId) {
public RequestInfo(String jid, String nameServiceId,
long epoch, long ipcSerialNumber,
long committedTxId) {
this.jid = jid;
this.nameServiceId = nameServiceId;
this.epoch = epoch;
this.ipcSerialNumber = ipcSerialNumber;
this.committedTxId = committedTxId;
}
public String getNameServiceId() {
return nameServiceId;
}
public long getEpoch() {
return epoch;
}

View File

@ -101,7 +101,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
IsFormattedRequestProto request) throws ServiceException {
try {
boolean ret = impl.isFormatted(
convert(request.getJid()));
convert(request.getJid()),
request.hasNameServiceId() ? request.getNameServiceId() : null);
return IsFormattedResponseProto.newBuilder()
.setIsFormatted(ret)
.build();
@ -116,7 +117,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
GetJournalStateRequestProto request) throws ServiceException {
try {
return impl.getJournalState(
convert(request.getJid()));
convert(request.getJid()),
request.hasNameServiceId() ? request.getNameServiceId() : null);
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
@ -132,6 +134,7 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
try {
return impl.newEpoch(
request.getJid().getIdentifier(),
request.hasNameServiceId() ? request.getNameServiceId() : null,
PBHelper.convert(request.getNsInfo()),
request.getEpoch());
} catch (IOException ioe) {
@ -143,6 +146,7 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
FormatRequestProto request) throws ServiceException {
try {
impl.format(request.getJid().getIdentifier(),
request.hasNameServiceId() ? request.getNameServiceId() : null,
PBHelper.convert(request.getNsInfo()));
return FormatResponseProto.getDefaultInstance();
} catch (IOException ioe) {
@ -223,6 +227,7 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
try {
return impl.getEditLogManifest(
request.getJid().getIdentifier(),
request.hasNameServiceId() ? request.getNameServiceId() : null,
request.getSinceTxId(),
request.getInProgressOk());
} catch (IOException e) {
@ -260,6 +265,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
QJournalProtocolProtos.RequestInfoProto reqInfo) {
return new RequestInfo(
reqInfo.getJournalId().getIdentifier(),
reqInfo.hasNameServiceId() ?
reqInfo.getNameServiceId() : null,
reqInfo.getEpoch(),
reqInfo.getIpcSerialNumber(),
reqInfo.hasCommittedTxId() ?
@ -294,7 +301,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
public DoFinalizeResponseProto doFinalize(RpcController controller,
DoFinalizeRequestProto request) throws ServiceException {
try {
impl.doFinalize(convert(request.getJid()));
impl.doFinalize(convert(request.getJid()),
request.hasNameServiceId() ? request.getNameServiceId() : null);
return DoFinalizeResponseProto.getDefaultInstance();
} catch (IOException e) {
throw new ServiceException(e);
@ -306,7 +314,9 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
CanRollBackRequestProto request) throws ServiceException {
try {
StorageInfo si = PBHelper.convert(request.getStorage(), NodeType.JOURNAL_NODE);
Boolean result = impl.canRollBack(convert(request.getJid()), si,
Boolean result = impl.canRollBack(convert(request.getJid()),
request.hasNameServiceId() ? request.getNameServiceId() : null,
si,
PBHelper.convert(request.getPrevStorage(), NodeType.JOURNAL_NODE),
request.getTargetLayoutVersion());
return CanRollBackResponseProto.newBuilder()
@ -321,7 +331,7 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
public DoRollbackResponseProto doRollback(RpcController controller, DoRollbackRequestProto request)
throws ServiceException {
try {
impl.doRollback(convert(request.getJid()));
impl.doRollback(convert(request.getJid()), request.getNameserviceId());
return DoRollbackResponseProto.getDefaultInstance();
} catch (IOException e) {
throw new ServiceException(e);
@ -333,7 +343,9 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
RpcController controller, DiscardSegmentsRequestProto request)
throws ServiceException {
try {
impl.discardSegments(convert(request.getJid()), request.getStartTxId());
impl.discardSegments(convert(request.getJid()),
request.hasNameServiceId() ? request.getNameServiceId() : null,
request.getStartTxId());
return DiscardSegmentsResponseProto.getDefaultInstance();
} catch (IOException e) {
throw new ServiceException(e);
@ -344,7 +356,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
public GetJournalCTimeResponseProto getJournalCTime(RpcController controller,
GetJournalCTimeRequestProto request) throws ServiceException {
try {
Long resultCTime = impl.getJournalCTime(convert(request.getJid()));
Long resultCTime = impl.getJournalCTime(convert(request.getJid()),
request.getNameServiceId());
return GetJournalCTimeResponseProto.newBuilder()
.setResultCTime(resultCTime)
.build();

View File

@ -93,13 +93,17 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
@Override
public boolean isFormatted(String journalId) throws IOException {
public boolean isFormatted(String journalId,
String nameServiceId) throws IOException {
try {
IsFormattedRequestProto req = IsFormattedRequestProto.newBuilder()
.setJid(convertJournalId(journalId))
.build();
IsFormattedRequestProto.Builder req = IsFormattedRequestProto.newBuilder()
.setJid(convertJournalId(journalId));
if (nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
IsFormattedResponseProto resp = rpcProxy.isFormatted(
NULL_CONTROLLER, req);
NULL_CONTROLLER, req.build());
return resp.getIsFormatted();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
@ -107,13 +111,17 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
}
@Override
public GetJournalStateResponseProto getJournalState(String jid)
public GetJournalStateResponseProto getJournalState(String jid,
String nameServiceId)
throws IOException {
try {
GetJournalStateRequestProto req = GetJournalStateRequestProto.newBuilder()
.setJid(convertJournalId(jid))
.build();
return rpcProxy.getJournalState(NULL_CONTROLLER, req);
GetJournalStateRequestProto.Builder req = GetJournalStateRequestProto
.newBuilder()
.setJid(convertJournalId(jid));
if (nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
return rpcProxy.getJournalState(NULL_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -126,28 +134,39 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
}
@Override
public void format(String jid, NamespaceInfo nsInfo) throws IOException {
public void format(String jid,
String nameServiceId,
NamespaceInfo nsInfo) throws IOException {
try {
FormatRequestProto req = FormatRequestProto.newBuilder()
FormatRequestProto.Builder req = FormatRequestProto.newBuilder()
.setJid(convertJournalId(jid))
.setNsInfo(PBHelper.convert(nsInfo))
.build();
rpcProxy.format(NULL_CONTROLLER, req);
.setNsInfo(PBHelper.convert(nsInfo));
if(nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
rpcProxy.format(NULL_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public NewEpochResponseProto newEpoch(String jid, NamespaceInfo nsInfo,
long epoch) throws IOException {
public NewEpochResponseProto newEpoch(String jid,
String nameServiceId,
NamespaceInfo nsInfo,
long epoch) throws IOException {
try {
NewEpochRequestProto req = NewEpochRequestProto.newBuilder()
.setJid(convertJournalId(jid))
.setNsInfo(PBHelper.convert(nsInfo))
.setEpoch(epoch)
.build();
return rpcProxy.newEpoch(NULL_CONTROLLER, req);
NewEpochRequestProto.Builder req = NewEpochRequestProto.newBuilder()
.setJid(convertJournalId(jid))
.setNsInfo(PBHelper.convert(nsInfo))
.setEpoch(epoch);
if(nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
return rpcProxy.newEpoch(NULL_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -191,6 +210,9 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
if (reqInfo.hasCommittedTxId()) {
builder.setCommittedTxId(reqInfo.getCommittedTxId());
}
if(reqInfo.getNameServiceId() != null) {
builder.setNameServiceId(reqInfo.getNameServiceId());
}
return builder.build();
}
@ -239,16 +261,21 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
}
@Override
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
long sinceTxId, boolean inProgressOk)
throws IOException {
public GetEditLogManifestResponseProto getEditLogManifest(
String jid, String nameServiceId,
long sinceTxId, boolean inProgressOk) throws IOException {
try {
GetEditLogManifestRequestProto.Builder req;
req = GetEditLogManifestRequestProto.newBuilder()
.setJid(convertJournalId(jid))
.setSinceTxId(sinceTxId)
.setInProgressOk(inProgressOk);
if (nameServiceId !=null) {
req.setNameServiceId(nameServiceId);
}
return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
GetEditLogManifestRequestProto.newBuilder()
.setJid(convertJournalId(jid))
.setSinceTxId(sinceTxId)
.setInProgressOk(inProgressOk)
.build());
req.build()
);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -292,10 +319,10 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
@Override
public void doPreUpgrade(String jid) throws IOException {
try {
rpcProxy.doPreUpgrade(NULL_CONTROLLER,
DoPreUpgradeRequestProto.newBuilder()
.setJid(convertJournalId(jid))
.build());
DoPreUpgradeRequestProto.Builder req;
req = DoPreUpgradeRequestProto.newBuilder()
.setJid(convertJournalId(jid));
rpcProxy.doPreUpgrade(NULL_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
@ -315,29 +342,37 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
}
@Override
public void doFinalize(String jid) throws IOException {
public void doFinalize(String jid, String nameServiceId) throws IOException {
try {
rpcProxy.doFinalize(NULL_CONTROLLER,
DoFinalizeRequestProto.newBuilder()
.setJid(convertJournalId(jid))
.build());
DoFinalizeRequestProto.Builder req = DoFinalizeRequestProto
.newBuilder()
.setJid(convertJournalId(jid));
if (nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
rpcProxy.doFinalize(NULL_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public Boolean canRollBack(String journalId, StorageInfo storage,
StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
public Boolean canRollBack(String journalId,
String nameServiceId,
StorageInfo storage,
StorageInfo prevStorage,
int targetLayoutVersion) throws IOException {
try {
CanRollBackRequestProto.Builder req = CanRollBackRequestProto.newBuilder()
.setJid(convertJournalId(journalId))
.setStorage(PBHelper.convert(storage))
.setPrevStorage(PBHelper.convert(prevStorage))
.setTargetLayoutVersion(targetLayoutVersion);
if (nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
CanRollBackResponseProto response = rpcProxy.canRollBack(
NULL_CONTROLLER,
CanRollBackRequestProto.newBuilder()
.setJid(convertJournalId(journalId))
.setStorage(PBHelper.convert(storage))
.setPrevStorage(PBHelper.convert(prevStorage))
.setTargetLayoutVersion(targetLayoutVersion)
.build());
NULL_CONTROLLER, req.build());
return response.getCanRollBack();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
@ -345,38 +380,53 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
}
@Override
public void doRollback(String journalId) throws IOException {
public void doRollback(String journalId,
String nameServiceId) throws IOException {
try {
rpcProxy.doRollback(NULL_CONTROLLER,
DoRollbackRequestProto.newBuilder()
.setJid(convertJournalId(journalId))
.build());
DoRollbackRequestProto.Builder req = DoRollbackRequestProto.newBuilder()
.setJid(convertJournalId(journalId));
if (nameServiceId != null) {
req.setNameserviceId(nameServiceId);
}
rpcProxy.doRollback(NULL_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public void discardSegments(String journalId, long startTxId)
public void discardSegments(String journalId,
String nameServiceId,
long startTxId)
throws IOException {
try {
rpcProxy.discardSegments(NULL_CONTROLLER,
DiscardSegmentsRequestProto.newBuilder()
.setJid(convertJournalId(journalId)).setStartTxId(startTxId)
.build());
DiscardSegmentsRequestProto.Builder req = DiscardSegmentsRequestProto
.newBuilder()
.setJid(convertJournalId(journalId)).setStartTxId(startTxId);
if (nameServiceId != null) {
req.setNameServiceId(nameServiceId);
}
rpcProxy.discardSegments(NULL_CONTROLLER, req.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public Long getJournalCTime(String journalId) throws IOException {
public Long getJournalCTime(String journalId,
String nameServiceId) throws IOException {
try {
GetJournalCTimeRequestProto.Builder req = GetJournalCTimeRequestProto
.newBuilder()
.setJid(convertJournalId(journalId));
if(nameServiceId !=null) {
req.setNameServiceId(nameServiceId);
}
GetJournalCTimeResponseProto response = rpcProxy.getJournalCTime(
NULL_CONTROLLER,
GetJournalCTimeRequestProto.newBuilder()
.setJid(convertJournalId(journalId))
.build());
NULL_CONTROLLER, req.build());
return response.getResultCTime();
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);

View File

@ -137,6 +137,11 @@ public class Journal implements Closeable {
private long lastJournalTimestamp = 0;
// This variable tracks, have we tried to start journalsyncer
// with nameServiceId. This will help not to start the journalsyncer
// on each rpc call, if it has failed to start
private boolean triedJournalSyncerStartedwithnsId = false;
/**
* Time threshold for sync calls, beyond which a warning should be logged to the console.
*/
@ -160,6 +165,14 @@ public class Journal implements Closeable {
}
}
public void setTriedJournalSyncerStartedwithnsId(boolean started) {
this.triedJournalSyncerStartedwithnsId = started;
}
public boolean getTriedJournalSyncerStartedwithnsId() {
return triedJournalSyncerStartedwithnsId;
}
/**
* Reload any data that may have been cached. This is necessary
* when we first load the Journal, but also after any formatting
@ -660,7 +673,7 @@ public class Journal implements Closeable {
}
/**
* @see QJournalProtocol#getEditLogManifest(String, long, boolean)
* @see QJournalProtocol#getEditLogManifest(String, String, long, boolean)
*/
public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
boolean inProgressOk) throws IOException {

View File

@ -86,7 +86,9 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
*/
private int resultCode = 0;
synchronized Journal getOrCreateJournal(String jid, StartupOption startOpt)
synchronized Journal getOrCreateJournal(String jid,
String nameServiceId,
StartupOption startOpt)
throws IOException {
QuorumJournalManager.checkJournalId(jid);
@ -101,22 +103,46 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
if (conf.getBoolean(
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY,
DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_DEFAULT)) {
startSyncer(journal, jid);
startSyncer(journal, jid, nameServiceId);
}
} else if (journalSyncersById.get(jid) != null &&
!journalSyncersById.get(jid).isJournalSyncerStarted() &&
!journalsById.get(jid).getTriedJournalSyncerStartedwithnsId() &&
nameServiceId != null) {
startSyncer(journal, jid, nameServiceId);
}
return journal;
}
private void startSyncer(Journal journal, String jid) {
JournalNodeSyncer jSyncer = new JournalNodeSyncer(this, journal, jid, conf);
journalSyncersById.put(jid, jSyncer);
jSyncer.start();
@VisibleForTesting
public boolean getJournalSyncerStatus(String jid) {
if (journalSyncersById.get(jid) != null) {
return journalSyncersById.get(jid).isJournalSyncerStarted();
} else {
return false;
}
}
private void startSyncer(Journal journal, String jid, String nameServiceId) {
JournalNodeSyncer jSyncer = journalSyncersById.get(jid);
if (jSyncer == null) {
jSyncer = new JournalNodeSyncer(this, journal, jid, conf, nameServiceId);
journalSyncersById.put(jid, jSyncer);
}
jSyncer.start(nameServiceId);
}
@VisibleForTesting
public Journal getOrCreateJournal(String jid) throws IOException {
return getOrCreateJournal(jid, StartupOption.REGULAR);
return getOrCreateJournal(jid, null, StartupOption.REGULAR);
}
public Journal getOrCreateJournal(String jid,
String nameServiceId)
throws IOException {
return getOrCreateJournal(jid, nameServiceId, StartupOption.REGULAR);
}
@Override
@ -357,26 +383,40 @@ public class JournalNode implements Tool, Configurable, JournalNodeMXBean {
getOrCreateJournal(journalId).doUpgrade(sInfo);
}
public void doFinalize(String journalId) throws IOException {
getOrCreateJournal(journalId).doFinalize();
public void doFinalize(String journalId,
String nameServiceId)
throws IOException {
getOrCreateJournal(journalId, nameServiceId).doFinalize();
}
public Boolean canRollBack(String journalId, StorageInfo storage,
StorageInfo prevStorage, int targetLayoutVersion) throws IOException {
return getOrCreateJournal(journalId, StartupOption.ROLLBACK).canRollBack(
StorageInfo prevStorage, int targetLayoutVersion,
String nameServiceId) throws IOException {
return getOrCreateJournal(journalId,
nameServiceId, StartupOption.ROLLBACK).canRollBack(
storage, prevStorage, targetLayoutVersion);
}
public void doRollback(String journalId) throws IOException {
getOrCreateJournal(journalId, StartupOption.ROLLBACK).doRollback();
public void doRollback(String journalId,
String nameServiceId) throws IOException {
getOrCreateJournal(journalId,
nameServiceId, StartupOption.ROLLBACK).doRollback();
}
public void discardSegments(String journalId, long startTxId)
public void discardSegments(String journalId, long startTxId,
String nameServiceId)
throws IOException {
getOrCreateJournal(journalId).discardSegments(startTxId);
getOrCreateJournal(journalId, nameServiceId).discardSegments(startTxId);
}
public Long getJournalCTime(String journalId) throws IOException {
return getOrCreateJournal(journalId).getJournalCTime();
public Long getJournalCTime(String journalId,
String nameServiceId) throws IOException {
return getOrCreateJournal(journalId, nameServiceId).getJournalCTime();
}
@VisibleForTesting
public Journal getJournal(String jid) {
return journalsById.get(jid);
}
}

View File

@ -49,10 +49,10 @@ import org.apache.hadoop.net.NetUtils;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService;
@InterfaceAudience.Private
@VisibleForTesting
public class JournalNodeRpcServer implements QJournalProtocol {
private static final int HANDLER_COUNT = 5;
private final JournalNode jn;
private Server server;
@ -117,15 +117,18 @@ public class JournalNodeRpcServer implements QJournalProtocol {
}
@Override
public boolean isFormatted(String journalId) throws IOException {
return jn.getOrCreateJournal(journalId).isFormatted();
public boolean isFormatted(String journalId,
String nameServiceId) throws IOException {
return jn.getOrCreateJournal(journalId, nameServiceId).isFormatted();
}
@SuppressWarnings("deprecation")
@Override
public GetJournalStateResponseProto getJournalState(String journalId)
public GetJournalStateResponseProto getJournalState(String journalId,
String nameServiceId)
throws IOException {
long epoch = jn.getOrCreateJournal(journalId).getLastPromisedEpoch();
long epoch = jn.getOrCreateJournal(journalId,
nameServiceId).getLastPromisedEpoch();
return GetJournalStateResponseProto.newBuilder()
.setLastPromisedEpoch(epoch)
.setHttpPort(jn.getBoundHttpAddress().getPort())
@ -135,59 +138,64 @@ public class JournalNodeRpcServer implements QJournalProtocol {
@Override
public NewEpochResponseProto newEpoch(String journalId,
NamespaceInfo nsInfo,
String nameServiceId,
NamespaceInfo nsInfo,
long epoch) throws IOException {
return jn.getOrCreateJournal(journalId).newEpoch(nsInfo, epoch);
return jn.getOrCreateJournal(journalId,
nameServiceId).newEpoch(nsInfo, epoch);
}
@Override
public void format(String journalId, NamespaceInfo nsInfo)
public void format(String journalId,
String nameServiceId,
NamespaceInfo nsInfo)
throws IOException {
jn.getOrCreateJournal(journalId).format(nsInfo);
jn.getOrCreateJournal(journalId, nameServiceId).format(nsInfo);
}
@Override
public void journal(RequestInfo reqInfo,
long segmentTxId, long firstTxnId,
int numTxns, byte[] records) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId())
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.journal(reqInfo, segmentTxId, firstTxnId, numTxns, records);
}
@Override
public void heartbeat(RequestInfo reqInfo) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId())
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.heartbeat(reqInfo);
}
@Override
public void startLogSegment(RequestInfo reqInfo, long txid, int layoutVersion)
throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId())
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.startLogSegment(reqInfo, txid, layoutVersion);
}
@Override
public void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
long endTxId) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId())
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.finalizeLogSegment(reqInfo, startTxId, endTxId);
}
@Override
public void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep)
throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId())
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.purgeLogsOlderThan(reqInfo, minTxIdToKeep);
}
@SuppressWarnings("deprecation")
@Override
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
public GetEditLogManifestResponseProto getEditLogManifest(
String jid, String nameServiceId,
long sinceTxId, boolean inProgressOk)
throws IOException {
RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid)
RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid, nameServiceId)
.getEditLogManifest(sinceTxId, inProgressOk);
return GetEditLogManifestResponseProto.newBuilder()
@ -200,14 +208,15 @@ public class JournalNodeRpcServer implements QJournalProtocol {
@Override
public PrepareRecoveryResponseProto prepareRecovery(RequestInfo reqInfo,
long segmentTxId) throws IOException {
return jn.getOrCreateJournal(reqInfo.getJournalId())
return jn.getOrCreateJournal(reqInfo.getJournalId(),
reqInfo.getNameServiceId())
.prepareRecovery(reqInfo, segmentTxId);
}
@Override
public void acceptRecovery(RequestInfo reqInfo, SegmentStateProto log,
URL fromUrl) throws IOException {
jn.getOrCreateJournal(reqInfo.getJournalId())
jn.getOrCreateJournal(reqInfo.getJournalId(), reqInfo.getNameServiceId())
.acceptRecovery(reqInfo, log, fromUrl);
}
@ -222,30 +231,36 @@ public class JournalNodeRpcServer implements QJournalProtocol {
}
@Override
public void doFinalize(String journalId) throws IOException {
jn.doFinalize(journalId);
public void doFinalize(String journalId,
String nameServiceId) throws IOException {
jn.doFinalize(journalId, nameServiceId);
}
@Override
public Boolean canRollBack(String journalId, StorageInfo storage,
public Boolean canRollBack(String journalId,
String nameServiceId, StorageInfo storage,
StorageInfo prevStorage, int targetLayoutVersion)
throws IOException {
return jn.canRollBack(journalId, storage, prevStorage, targetLayoutVersion);
return jn.canRollBack(journalId, storage, prevStorage, targetLayoutVersion,
nameServiceId);
}
@Override
public void doRollback(String journalId) throws IOException {
jn.doRollback(journalId);
public void doRollback(String journalId,
String nameServiceId) throws IOException {
jn.doRollback(journalId, nameServiceId);
}
@Override
public void discardSegments(String journalId, long startTxId)
public void discardSegments(String journalId,
String nameServiceId, long startTxId)
throws IOException {
jn.discardSegments(journalId, startTxId);
jn.discardSegments(journalId, startTxId, nameServiceId);
}
@Override
public Long getJournalCTime(String journalId) throws IOException {
return jn.getJournalCTime(journalId);
public Long getJournalCTime(String journalId,
String nameServiceId) throws IOException {
return jn.getJournalCTime(journalId, nameServiceId);
}
}

View File

@ -65,6 +65,7 @@ public class JournalNodeSyncer {
private final JournalNode jn;
private final Journal journal;
private final String jid;
private String nameServiceId;
private final JournalIdProto jidProto;
private final JNStorage jnStorage;
private final Configuration conf;
@ -78,12 +79,14 @@ public class JournalNodeSyncer {
private final int logSegmentTransferTimeout;
private final DataTransferThrottler throttler;
private final JournalMetrics metrics;
private boolean journalSyncerStarted;
JournalNodeSyncer(JournalNode jouranlNode, Journal journal, String jid,
Configuration conf) {
Configuration conf, String nameServiceId) {
this.jn = jouranlNode;
this.journal = journal;
this.jid = jid;
this.nameServiceId = nameServiceId;
this.jidProto = convertJournalId(this.jid);
this.jnStorage = journal.getStorage();
this.conf = conf;
@ -95,6 +98,7 @@ public class JournalNodeSyncer {
DFSConfigKeys.DFS_EDIT_LOG_TRANSFER_TIMEOUT_DEFAULT);
throttler = getThrottler(conf);
metrics = journal.getMetrics();
journalSyncerStarted = false;
}
void stopSync() {
@ -109,13 +113,21 @@ public class JournalNodeSyncer {
}
}
public void start() {
LOG.info("Starting SyncJournal daemon for journal " + jid);
if (getOtherJournalNodeProxies()) {
startSyncJournalsDaemon();
} else {
LOG.warn("Failed to start SyncJournal daemon for journal " + jid);
public void start(String nsId) {
if (nsId != null) {
this.nameServiceId = nsId;
journal.setTriedJournalSyncerStartedwithnsId(true);
}
if (!journalSyncerStarted && getOtherJournalNodeProxies()) {
LOG.info("Starting SyncJournal daemon for journal " + jid);
startSyncJournalsDaemon();
journalSyncerStarted = true;
}
}
public boolean isJournalSyncerStarted() {
return journalSyncerStarted;
}
private boolean createEditsSyncDir() {

View File

@ -1805,8 +1805,20 @@ public class FSEditLog implements LogsPurgeable {
try {
Constructor<? extends JournalManager> cons
= clazz.getConstructor(Configuration.class, URI.class,
NamespaceInfo.class, String.class);
String nameServiceId = conf.get(DFSConfigKeys.DFS_NAMESERVICE_ID);
return cons.newInstance(conf, uri, storage.getNamespaceInfo(),
nameServiceId);
} catch (NoSuchMethodException ne) {
try {
Constructor<? extends JournalManager> cons
= clazz.getConstructor(Configuration.class, URI.class,
NamespaceInfo.class);
return cons.newInstance(conf, uri, storage.getNamespaceInfo());
return cons.newInstance(conf, uri, storage.getNamespaceInfo());
} catch (Exception e) {
throw new IllegalArgumentException("Unable to construct journal, "
+ uri, e);
}
} catch (Exception e) {
throw new IllegalArgumentException("Unable to construct journal, "
+ uri, e);

View File

@ -46,6 +46,7 @@ message RequestInfoProto {
// request itself, eg in the case that the node has
// fallen behind.
optional uint64 committedTxId = 4;
optional string nameServiceId = 5;
}
message SegmentStateProto {
@ -73,6 +74,7 @@ message JournalRequestProto {
required uint32 numTxns = 3;
required bytes records = 4;
required uint64 segmentTxnId = 5;
optional string nameServiceId = 6;
}
message JournalResponseProto {
@ -129,6 +131,7 @@ message PurgeLogsResponseProto {
*/
message IsFormattedRequestProto {
required JournalIdProto jid = 1;
optional string nameServiceId = 2;
}
message IsFormattedResponseProto {
@ -140,6 +143,7 @@ message IsFormattedResponseProto {
*/
message GetJournalCTimeRequestProto {
required JournalIdProto jid = 1;
optional string nameServiceId = 2;
}
message GetJournalCTimeResponseProto {
@ -172,6 +176,7 @@ message DoUpgradeResponseProto {
*/
message DoFinalizeRequestProto {
required JournalIdProto jid = 1;
optional string nameServiceId = 2;
}
message DoFinalizeResponseProto {
@ -185,6 +190,7 @@ message CanRollBackRequestProto {
required StorageInfoProto storage = 2;
required StorageInfoProto prevStorage = 3;
required int32 targetLayoutVersion = 4;
optional string nameServiceId = 5;
}
message CanRollBackResponseProto {
@ -196,6 +202,7 @@ message CanRollBackResponseProto {
*/
message DoRollbackRequestProto {
required JournalIdProto jid = 1;
optional string nameserviceId = 2;
}
message DoRollbackResponseProto {
@ -207,6 +214,7 @@ message DoRollbackResponseProto {
message DiscardSegmentsRequestProto {
required JournalIdProto jid = 1;
required uint64 startTxId = 2;
optional string nameServiceId = 3;
}
message DiscardSegmentsResponseProto {
@ -217,6 +225,7 @@ message DiscardSegmentsResponseProto {
*/
message GetJournalStateRequestProto {
required JournalIdProto jid = 1;
optional string nameServiceId = 2;
}
message GetJournalStateResponseProto {
@ -232,6 +241,7 @@ message GetJournalStateResponseProto {
message FormatRequestProto {
required JournalIdProto jid = 1;
required NamespaceInfoProto nsInfo = 2;
optional string nameServiceId = 3;
}
message FormatResponseProto {
@ -244,6 +254,7 @@ message NewEpochRequestProto {
required JournalIdProto jid = 1;
required NamespaceInfoProto nsInfo = 2;
required uint64 epoch = 3;
optional string nameServiceId = 4;
}
message NewEpochResponseProto {
@ -259,6 +270,7 @@ message GetEditLogManifestRequestProto {
// Whether or not the client will be reading from the returned streams.
// optional bool forReading = 3 [default = true]; <obsolete, do not reuse>
optional bool inProgressOk = 4 [default = false];
optional string nameServiceId = 5;
}
message GetEditLogManifestResponseProto {

View File

@ -107,9 +107,9 @@ public class TestEpochsAreUnique {
private class FaultyLoggerFactory implements AsyncLogger.Factory {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
String journalId, String nameServiceId, InetSocketAddress addr) {
AsyncLogger ch = IPCLoggerChannel.FACTORY.createLogger(
conf, nsInfo, journalId, addr);
conf, nsInfo, journalId, nameServiceId, addr);
AsyncLogger spy = Mockito.spy(ch);
Mockito.doAnswer(new SometimesFaulty<Long>(0.10f))
.when(spy).getJournalState();

View File

@ -61,7 +61,6 @@ import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
@ -440,9 +439,14 @@ public class TestQJMWithFaults {
new WrapEveryCall<Object>(realProxy) {
void beforeCall(InvocationOnMock invocation) throws Exception {
rpcCount++;
String param="";
for (Object val : invocation.getArguments()) {
param += val +",";
}
String callStr = "[" + addr + "] " +
invocation.getMethod().getName() + "(" +
Joiner.on(", ").join(invocation.getArguments()) + ")";
param + ")";
Callable<Void> inject = injections.get(rpcCount);
if (inject != null) {
@ -505,7 +509,7 @@ public class TestQJMWithFaults {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
String journalId, String nameserviceId, InetSocketAddress addr) {
return new InvocationCountingChannel(conf, nsInfo, journalId, addr);
}
};
@ -520,7 +524,7 @@ public class TestQJMWithFaults {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
String journalId, String nameServiceId, InetSocketAddress addr) {
return new RandomFaultyChannel(conf, nsInfo, journalId, addr,
seedGenerator.nextLong());
}

View File

@ -940,8 +940,9 @@ public class TestQuorumJournalManager {
AsyncLogger.Factory spyFactory = new AsyncLogger.Factory() {
@Override
public AsyncLogger createLogger(Configuration conf, NamespaceInfo nsInfo,
String journalId, InetSocketAddress addr) {
AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId, addr) {
String journalId, String nameServiceId, InetSocketAddress addr) {
AsyncLogger logger = new IPCLoggerChannel(conf, nsInfo, journalId,
nameServiceId, addr) {
protected ExecutorService createSingleThreadExecutor() {
// Don't parallelize calls to the quorum in the tests.
// This makes the tests more deterministic.

View File

@ -156,12 +156,12 @@ public class TestJournal {
journal.startLogSegment(makeRI(1), 1,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
// Send txids 1-3, with a request indicating only 0 committed
journal.journal(new RequestInfo(JID, 1, 2, 0), 1, 1, 3,
journal.journal(new RequestInfo(JID, null, 1, 2, 0), 1, 1, 3,
QJMTestUtil.createTxnData(1, 3));
assertEquals(0, journal.getCommittedTxnId());
// Send 4-6, with request indicating that through 3 is committed.
journal.journal(new RequestInfo(JID, 1, 3, 3), 1, 4, 3,
journal.journal(new RequestInfo(JID, null, 1, 3, 3), 1, 4, 3,
QJMTestUtil.createTxnData(4, 6));
assertEquals(3, journal.getCommittedTxnId());
}
@ -195,7 +195,7 @@ public class TestJournal {
@Test (timeout = 10000)
public void testFormatResetsCachedValues() throws Exception {
journal.newEpoch(FAKE_NSINFO, 12345L);
journal.startLogSegment(new RequestInfo(JID, 12345L, 1L, 0L), 1L,
journal.startLogSegment(new RequestInfo(JID, null, 12345L, 1L, 0L), 1L,
NameNodeLayoutVersion.CURRENT_LAYOUT_VERSION);
assertEquals(12345L, journal.getLastPromisedEpoch());
@ -404,7 +404,7 @@ public class TestJournal {
}
private static RequestInfo makeRI(int serial) {
return new RequestInfo(JID, 1, serial, 0);
return new RequestInfo(JID, null, 1, serial, 0);
}
@Test (timeout = 10000)

View File

@ -24,6 +24,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.File;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URL;
import java.util.concurrent.ExecutionException;
@ -43,24 +44,30 @@ import org.apache.hadoop.hdfs.server.namenode.NameNodeLayoutVersion;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.test.PathUtils;
import org.apache.hadoop.util.Shell;
import org.apache.hadoop.util.StopWatch;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import com.google.common.base.Charsets;
import com.google.common.primitives.Bytes;
import com.google.common.primitives.Ints;
import org.junit.rules.TestName;
import org.mockito.Mockito;
public class TestJournalNode {
private static final NamespaceInfo FAKE_NSINFO = new NamespaceInfo(
12345, "mycluster", "my-bp", 0L);
@Rule
public TestName testName = new TestName();
private static final File TEST_BUILD_DATA = PathUtils.getTestDir(TestJournalNode.class);
@ -85,6 +92,21 @@ public class TestJournalNode {
editsDir.getAbsolutePath());
conf.set(DFSConfigKeys.DFS_JOURNALNODE_RPC_ADDRESS_KEY,
"0.0.0.0:0");
if (testName.getMethodName().equals(
"testJournalNodeSyncerNotStartWhenSyncDisabled")) {
conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY,
false);
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
"qjournal://jn0:9900;jn1:9901");
} else if (testName.getMethodName().equals(
"testJournalNodeSyncerNotStartWhenSyncEnabledIncorrectURI")) {
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
"qjournal://journal0\\:9900;journal1:9901");
} else if (testName.getMethodName().equals(
"testJournalNodeSyncerNotStartWhenSyncEnabled")) {
conf.set(DFSConfigKeys.DFS_NAMENODE_SHARED_EDITS_DIR_KEY,
"qjournal://jn0:9900;jn1:9901");
}
jn = new JournalNode();
jn.setConf(conf);
jn.start();
@ -363,4 +385,79 @@ public class TestJournalNode {
Mockito.verify(jNode).stop(1);
}
@Test
public void testJournalNodeSyncerNotStartWhenSyncDisabled()
throws IOException{
//JournalSyncer will not be started, as journalsync is not enabled
conf.setBoolean(DFSConfigKeys.DFS_JOURNALNODE_ENABLE_SYNC_KEY, false);
jn.getOrCreateJournal(journalId);
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(false,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
//Trying by passing nameserviceId still journalnodesyncer should not start
// IstriedJournalSyncerStartWithnsId should also be false
jn.getOrCreateJournal(journalId, "mycluster");
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(false,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
}
@Test
public void testJournalNodeSyncerNotStartWhenSyncEnabledIncorrectURI()
throws IOException{
//JournalSyncer will not be started,
// as shared edits hostnames are not resolved
jn.getOrCreateJournal(journalId);
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(false,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
//Trying by passing nameserviceId, now
// IstriedJournalSyncerStartWithnsId should be set
// but journalnode syncer will not be started,
// as hostnames are not resolved
jn.getOrCreateJournal(journalId, "mycluster");
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(true,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
}
@Test
public void testJournalNodeSyncerNotStartWhenSyncEnabled()
throws IOException{
//JournalSyncer will not be started,
// as shared edits hostnames are not resolved
jn.getOrCreateJournal(journalId);
Assert.assertEquals(false,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(false,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
//Trying by passing nameserviceId and resolve hostnames
// now IstriedJournalSyncerStartWithnsId should be set
// and also journalnode syncer will also be started
setupStaticHostResolution(2, "jn");
jn.getOrCreateJournal(journalId, "mycluster");
Assert.assertEquals(true,
jn.getJournalSyncerStatus(journalId));
Assert.assertEquals(true,
jn.getJournal(journalId).getTriedJournalSyncerStartedwithnsId());
}
private void setupStaticHostResolution(int nameServiceIdCount,
String hostname) {
for (int i = 0; i < nameServiceIdCount; i++) {
NetUtils.addStaticResolution(hostname + i,
"localhost");
}
}
}

View File

@ -96,6 +96,13 @@ public class TestJournalNodeSync {
@Test(timeout=30000)
public void testJournalNodeSync() throws Exception {
//As by default 3 journal nodes are started;
for(int i=0; i<3; i++) {
Assert.assertEquals(true,
jCluster.getJournalNode(i).getJournalSyncerStatus("ns1"));
}
File firstJournalDir = jCluster.getJournalDir(0, jid);
File firstJournalCurrentDir = new StorageDirectory(firstJournalDir)
.getCurrentDir();