svn merge -c 1310649 from trunk for HDFS-3211. Add fence(..) and replace NamenodeRegistration with JournalInfo and epoch in JournalProtocol.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1337423 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
2aec5f3bc5
commit
c0795cd107
|
@ -315,6 +315,9 @@ Release 2.0.0 - UNRELEASED
|
||||||
HDFS-3400. DNs should be able start with jsvc even if security is disabled.
|
HDFS-3400. DNs should be able start with jsvc even if security is disabled.
|
||||||
(atm via eli)
|
(atm via eli)
|
||||||
|
|
||||||
|
HDFS-3211. Add fence(..) and replace NamenodeRegistration with JournalInfo
|
||||||
|
and epoch in JournalProtocol. (suresh via szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HDFS-2477. Optimize computing the diff between a block report and the
|
HDFS-2477. Optimize computing the diff between a block report and the
|
||||||
|
|
|
@ -22,6 +22,7 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -33,6 +34,10 @@ import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
|
||||||
public class UnregisteredNodeException extends IOException {
|
public class UnregisteredNodeException extends IOException {
|
||||||
private static final long serialVersionUID = -5620209396945970810L;
|
private static final long serialVersionUID = -5620209396945970810L;
|
||||||
|
|
||||||
|
public UnregisteredNodeException(JournalInfo info) {
|
||||||
|
super("Unregistered server: " + info.toString());
|
||||||
|
}
|
||||||
|
|
||||||
public UnregisteredNodeException(NodeRegistration nodeReg) {
|
public UnregisteredNodeException(NodeRegistration nodeReg) {
|
||||||
super("Unregistered server: " + nodeReg.toString());
|
super("Unregistered server: " + nodeReg.toString());
|
||||||
}
|
}
|
||||||
|
|
|
@ -20,10 +20,13 @@ package org.apache.hadoop.hdfs.protocolPB;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto;
|
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
||||||
|
|
||||||
import com.google.protobuf.RpcController;
|
import com.google.protobuf.RpcController;
|
||||||
|
@ -48,9 +51,8 @@ public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB
|
||||||
public JournalResponseProto journal(RpcController unused,
|
public JournalResponseProto journal(RpcController unused,
|
||||||
JournalRequestProto req) throws ServiceException {
|
JournalRequestProto req) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
impl.journal(PBHelper.convert(req.getJournalInfo()),
|
impl.journal(PBHelper.convert(req.getJournalInfo()), req.getEpoch(),
|
||||||
req.getFirstTxnId(), req.getNumTxns(), req.getRecords()
|
req.getFirstTxnId(), req.getNumTxns(), req.getRecords().toByteArray());
|
||||||
.toByteArray());
|
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
|
@ -63,10 +65,24 @@ public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB
|
||||||
StartLogSegmentRequestProto req) throws ServiceException {
|
StartLogSegmentRequestProto req) throws ServiceException {
|
||||||
try {
|
try {
|
||||||
impl.startLogSegment(PBHelper.convert(req.getJournalInfo()),
|
impl.startLogSegment(PBHelper.convert(req.getJournalInfo()),
|
||||||
req.getTxid());
|
req.getEpoch(), req.getTxid());
|
||||||
} catch (IOException e) {
|
} catch (IOException e) {
|
||||||
throw new ServiceException(e);
|
throw new ServiceException(e);
|
||||||
}
|
}
|
||||||
return StartLogSegmentResponseProto.newBuilder().build();
|
return StartLogSegmentResponseProto.newBuilder().build();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FenceResponseProto fence(RpcController controller,
|
||||||
|
FenceRequestProto req) throws ServiceException {
|
||||||
|
try {
|
||||||
|
FenceResponse resp = impl.fence(PBHelper.convert(req.getJournalInfo()), req.getEpoch(),
|
||||||
|
req.getFencerInfo());
|
||||||
|
return FenceResponseProto.newBuilder().setInSync(resp.isInSync())
|
||||||
|
.setLastTransactionId(resp.getLastTransactionId())
|
||||||
|
.setPreviousEpoch(resp.getPreviousEpoch()).build();
|
||||||
|
} catch (IOException e) {
|
||||||
|
throw new ServiceException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,10 +22,13 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
|
||||||
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
|
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
|
||||||
import org.apache.hadoop.ipc.ProtobufHelper;
|
import org.apache.hadoop.ipc.ProtobufHelper;
|
||||||
import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
import org.apache.hadoop.ipc.ProtocolMetaInterface;
|
||||||
import org.apache.hadoop.ipc.RPC;
|
import org.apache.hadoop.ipc.RPC;
|
||||||
|
@ -58,10 +61,11 @@ public class JournalProtocolTranslatorPB implements ProtocolMetaInterface,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void journal(NamenodeRegistration reg, long firstTxnId,
|
public void journal(JournalInfo journalInfo, long epoch, long firstTxnId,
|
||||||
int numTxns, byte[] records) throws IOException {
|
int numTxns, byte[] records) throws IOException {
|
||||||
JournalRequestProto req = JournalRequestProto.newBuilder()
|
JournalRequestProto req = JournalRequestProto.newBuilder()
|
||||||
.setJournalInfo(PBHelper.convertToJournalInfo(reg))
|
.setJournalInfo(PBHelper.convert(journalInfo))
|
||||||
|
.setEpoch(epoch)
|
||||||
.setFirstTxnId(firstTxnId)
|
.setFirstTxnId(firstTxnId)
|
||||||
.setNumTxns(numTxns)
|
.setNumTxns(numTxns)
|
||||||
.setRecords(PBHelper.getByteString(records))
|
.setRecords(PBHelper.getByteString(records))
|
||||||
|
@ -74,10 +78,11 @@ public class JournalProtocolTranslatorPB implements ProtocolMetaInterface,
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startLogSegment(NamenodeRegistration registration, long txid)
|
public void startLogSegment(JournalInfo journalInfo, long epoch, long txid)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
|
StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
|
||||||
.setJournalInfo(PBHelper.convertToJournalInfo(registration))
|
.setJournalInfo(PBHelper.convert(journalInfo))
|
||||||
|
.setEpoch(epoch)
|
||||||
.setTxid(txid)
|
.setTxid(txid)
|
||||||
.build();
|
.build();
|
||||||
try {
|
try {
|
||||||
|
@ -87,6 +92,20 @@ public class JournalProtocolTranslatorPB implements ProtocolMetaInterface,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FenceResponse fence(JournalInfo journalInfo, long epoch,
|
||||||
|
String fencerInfo) throws IOException {
|
||||||
|
FenceRequestProto req = FenceRequestProto.newBuilder().setEpoch(epoch)
|
||||||
|
.setJournalInfo(PBHelper.convert(journalInfo)).build();
|
||||||
|
try {
|
||||||
|
FenceResponseProto resp = rpcProxy.fence(NULL_CONTROLLER, req);
|
||||||
|
return new FenceResponse(resp.getPreviousEpoch(),
|
||||||
|
resp.getLastTransactionId(), resp.getInSync());
|
||||||
|
} catch (ServiceException e) {
|
||||||
|
throw ProtobufHelper.getRemoteException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean isMethodSupported(String methodName) throws IOException {
|
public boolean isMethodSupported(String methodName) throws IOException {
|
||||||
return RpcClientUtil.isMethodSupported(rpcProxy, JournalProtocolPB.class,
|
return RpcClientUtil.isMethodSupported(rpcProxy, JournalProtocolPB.class,
|
||||||
|
|
|
@ -110,6 +110,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
|
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
|
||||||
|
@ -117,6 +118,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
|
@ -127,7 +129,6 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
|
|
||||||
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
|
||||||
import org.apache.hadoop.io.EnumSetWritable;
|
import org.apache.hadoop.io.EnumSetWritable;
|
||||||
import org.apache.hadoop.io.Text;
|
import org.apache.hadoop.io.Text;
|
||||||
|
@ -1349,25 +1350,19 @@ public class PBHelper {
|
||||||
.setStorageID(r.getStorageID()).build();
|
.setStorageID(r.getStorageID()).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
public static NamenodeRegistration convert(JournalInfoProto info) {
|
public static JournalInfo convert(JournalInfoProto info) {
|
||||||
int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
|
int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
|
||||||
int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;
|
int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;
|
||||||
StorageInfo storage = new StorageInfo(lv, nsID, info.getClusterID(), 0);
|
return new JournalInfo(lv, info.getClusterID(), nsID);
|
||||||
|
|
||||||
// Note that the role is always {@link NamenodeRole#NAMENODE} as this
|
|
||||||
// conversion happens for messages from Namenode to Journal receivers.
|
|
||||||
// Addresses in the registration are unused.
|
|
||||||
return new NamenodeRegistration("", "", storage, NamenodeRole.NAMENODE);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Method used for converting {@link JournalInfoProto} sent from Namenode
|
* Method used for converting {@link JournalInfoProto} sent from Namenode
|
||||||
* to Journal receivers to {@link NamenodeRegistration}.
|
* to Journal receivers to {@link NamenodeRegistration}.
|
||||||
*/
|
*/
|
||||||
public static JournalInfoProto convertToJournalInfo(
|
public static JournalInfoProto convert(JournalInfo j) {
|
||||||
NamenodeRegistration reg) {
|
return JournalInfoProto.newBuilder().setClusterID(j.getClusterId())
|
||||||
return JournalInfoProto.newBuilder().setClusterID(reg.getClusterID())
|
.setLayoutVersion(j.getLayoutVersion())
|
||||||
.setLayoutVersion(reg.getLayoutVersion())
|
.setNamespaceID(j.getNamespaceId()).build();
|
||||||
.setNamespaceID(reg.getNamespaceID()).build();
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -26,19 +27,20 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
* to a BackupNode.
|
* to a BackupNode.
|
||||||
*/
|
*/
|
||||||
class BackupJournalManager implements JournalManager {
|
class BackupJournalManager implements JournalManager {
|
||||||
|
|
||||||
private final NamenodeRegistration nnReg;
|
|
||||||
private final NamenodeRegistration bnReg;
|
private final NamenodeRegistration bnReg;
|
||||||
|
private final JournalInfo journalInfo;
|
||||||
|
|
||||||
BackupJournalManager(NamenodeRegistration bnReg,
|
BackupJournalManager(NamenodeRegistration bnReg,
|
||||||
NamenodeRegistration nnReg) {
|
NamenodeRegistration nnReg) {
|
||||||
|
journalInfo = new JournalInfo(nnReg.getLayoutVersion(),
|
||||||
|
nnReg.getClusterID(), nnReg.getNamespaceID());
|
||||||
this.bnReg = bnReg;
|
this.bnReg = bnReg;
|
||||||
this.nnReg = nnReg;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public EditLogOutputStream startLogSegment(long txId) throws IOException {
|
public EditLogOutputStream startLogSegment(long txId) throws IOException {
|
||||||
EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, nnReg);
|
EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg,
|
||||||
|
journalInfo);
|
||||||
stm.startLogSegment(txId);
|
stm.startLogSegment(txId);
|
||||||
return stm;
|
return stm;
|
||||||
}
|
}
|
||||||
|
|
|
@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
|
||||||
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
|
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
|
||||||
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
|
@ -207,7 +209,8 @@ public class BackupNode extends NameNode {
|
||||||
}
|
}
|
||||||
|
|
||||||
/* @Override */// NameNode
|
/* @Override */// NameNode
|
||||||
public boolean setSafeMode(SafeModeAction action) throws IOException {
|
public boolean setSafeMode(@SuppressWarnings("unused") SafeModeAction action)
|
||||||
|
throws IOException {
|
||||||
throw new UnsupportedActionException("setSafeMode");
|
throw new UnsupportedActionException("setSafeMode");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -226,51 +229,56 @@ public class BackupNode extends NameNode {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Verifies a journal request
|
* Verifies a journal request
|
||||||
* @param nodeReg node registration
|
|
||||||
* @throws UnregisteredNodeException if the registration is invalid
|
|
||||||
*/
|
*/
|
||||||
void verifyJournalRequest(NamenodeRegistration reg) throws IOException {
|
private void verifyJournalRequest(JournalInfo journalInfo)
|
||||||
verifyLayoutVersion(reg.getLayoutVersion());
|
throws IOException {
|
||||||
|
verifyLayoutVersion(journalInfo.getLayoutVersion());
|
||||||
String errorMsg = null;
|
String errorMsg = null;
|
||||||
int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID();
|
int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID();
|
||||||
if (reg.getNamespaceID() != expectedNamespaceID) {
|
if (journalInfo.getNamespaceId() != expectedNamespaceID) {
|
||||||
errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
|
errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
|
||||||
+ " actual " + reg.getNamespaceID();
|
+ " actual " + journalInfo.getNamespaceId();
|
||||||
LOG.warn(errorMsg);
|
LOG.warn(errorMsg);
|
||||||
throw new UnregisteredNodeException(reg);
|
throw new UnregisteredNodeException(journalInfo);
|
||||||
}
|
}
|
||||||
if (!reg.getClusterID().equals(namesystem.getClusterId())) {
|
if (!journalInfo.getClusterId().equals(namesystem.getClusterId())) {
|
||||||
errorMsg = "Invalid clusterId in journal request - expected "
|
errorMsg = "Invalid clusterId in journal request - expected "
|
||||||
+ reg.getClusterID() + " actual " + namesystem.getClusterId();
|
+ journalInfo.getClusterId() + " actual " + namesystem.getClusterId();
|
||||||
LOG.warn(errorMsg);
|
LOG.warn(errorMsg);
|
||||||
throw new UnregisteredNodeException(reg);
|
throw new UnregisteredNodeException(journalInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////
|
||||||
// BackupNodeProtocol implementation for backup node.
|
// BackupNodeProtocol implementation for backup node.
|
||||||
/////////////////////////////////////////////////////
|
/////////////////////////////////////////////////////
|
||||||
@Override
|
@Override
|
||||||
public void startLogSegment(NamenodeRegistration registration, long txid)
|
public void startLogSegment(JournalInfo journalInfo, long epoch,
|
||||||
throws IOException {
|
long txid) throws IOException {
|
||||||
namesystem.checkOperation(OperationCategory.JOURNAL);
|
namesystem.checkOperation(OperationCategory.JOURNAL);
|
||||||
verifyJournalRequest(registration);
|
verifyJournalRequest(journalInfo);
|
||||||
getBNImage().namenodeStartedLogSegment(txid);
|
getBNImage().namenodeStartedLogSegment(txid);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void journal(NamenodeRegistration nnReg,
|
public void journal(JournalInfo journalInfo, long epoch, long firstTxId,
|
||||||
long firstTxId, int numTxns,
|
int numTxns, byte[] records) throws IOException {
|
||||||
byte[] records) throws IOException {
|
|
||||||
namesystem.checkOperation(OperationCategory.JOURNAL);
|
namesystem.checkOperation(OperationCategory.JOURNAL);
|
||||||
verifyJournalRequest(nnReg);
|
verifyJournalRequest(journalInfo);
|
||||||
getBNImage().journal(firstTxId, numTxns, records);
|
getBNImage().journal(firstTxId, numTxns, records);
|
||||||
}
|
}
|
||||||
|
|
||||||
private BackupImage getBNImage() {
|
private BackupImage getBNImage() {
|
||||||
return (BackupImage)nn.getFSImage();
|
return (BackupImage)nn.getFSImage();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public FenceResponse fence(JournalInfo journalInfo, long epoch,
|
||||||
|
String fencerInfo) throws IOException {
|
||||||
|
LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
|
||||||
|
throw new UnsupportedOperationException(
|
||||||
|
"BackupNode does not support fence");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
//////////////////////////////////////////////////////
|
//////////////////////////////////////////////////////
|
||||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Arrays;
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.NameNodeProxies;
|
import org.apache.hadoop.hdfs.NameNodeProxies;
|
||||||
import org.apache.hadoop.hdfs.server.common.Storage;
|
import org.apache.hadoop.hdfs.server.common.Storage;
|
||||||
|
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
|
||||||
import org.apache.hadoop.io.DataOutputBuffer;
|
import org.apache.hadoop.io.DataOutputBuffer;
|
||||||
|
@ -42,18 +43,18 @@ import org.apache.hadoop.security.UserGroupInformation;
|
||||||
class EditLogBackupOutputStream extends EditLogOutputStream {
|
class EditLogBackupOutputStream extends EditLogOutputStream {
|
||||||
static int DEFAULT_BUFFER_SIZE = 256;
|
static int DEFAULT_BUFFER_SIZE = 256;
|
||||||
|
|
||||||
private JournalProtocol backupNode; // RPC proxy to backup node
|
private final JournalProtocol backupNode; // RPC proxy to backup node
|
||||||
private NamenodeRegistration bnRegistration; // backup node registration
|
private final NamenodeRegistration bnRegistration; // backup node registration
|
||||||
private NamenodeRegistration nnRegistration; // active node registration
|
private final JournalInfo journalInfo; // active node registration
|
||||||
|
private final DataOutputBuffer out; // serialized output sent to backup node
|
||||||
private EditsDoubleBuffer doubleBuf;
|
private EditsDoubleBuffer doubleBuf;
|
||||||
private DataOutputBuffer out; // serialized output sent to backup node
|
|
||||||
|
|
||||||
EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
|
EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
|
||||||
NamenodeRegistration nnReg) // active name-node
|
JournalInfo journalInfo) // active name-node
|
||||||
throws IOException {
|
throws IOException {
|
||||||
super();
|
super();
|
||||||
this.bnRegistration = bnReg;
|
this.bnRegistration = bnReg;
|
||||||
this.nnRegistration = nnReg;
|
this.journalInfo = journalInfo;
|
||||||
InetSocketAddress bnAddress =
|
InetSocketAddress bnAddress =
|
||||||
NetUtils.createSocketAddr(bnRegistration.getAddress());
|
NetUtils.createSocketAddr(bnRegistration.getAddress());
|
||||||
try {
|
try {
|
||||||
|
@ -127,8 +128,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
|
||||||
out.reset();
|
out.reset();
|
||||||
assert out.getLength() == 0 : "Output buffer is not empty";
|
assert out.getLength() == 0 : "Output buffer is not empty";
|
||||||
|
|
||||||
backupNode.journal(nnRegistration,
|
backupNode.journal(journalInfo, 0, firstTxToFlush, numReadyTxns, data);
|
||||||
firstTxToFlush, numReadyTxns, data);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -140,6 +140,6 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
|
||||||
}
|
}
|
||||||
|
|
||||||
void startLogSegment(long txId) throws IOException {
|
void startLogSegment(long txId) throws IOException {
|
||||||
backupNode.startLogSegment(nnRegistration, txId);
|
backupNode.startLogSegment(journalInfo, 0, txId);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -0,0 +1,48 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.protocol;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Response to a journal fence request. See {@link JournalProtocol#fence}
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class FenceResponse {
|
||||||
|
private final long previousEpoch;
|
||||||
|
private final long lastTransactionId;
|
||||||
|
private final boolean isInSync;
|
||||||
|
|
||||||
|
public FenceResponse(long previousEpoch, long lastTransId, boolean inSync) {
|
||||||
|
this.previousEpoch = previousEpoch;
|
||||||
|
this.lastTransactionId = lastTransId;
|
||||||
|
this.isInSync = inSync;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isInSync() {
|
||||||
|
return isInSync;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getLastTransactionId() {
|
||||||
|
return lastTransactionId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getPreviousEpoch() {
|
||||||
|
return previousEpoch;
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,32 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.protocol;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If a previous user of a resource tries to use a shared resource, after
|
||||||
|
* fenced by another user, this exception is thrown.
|
||||||
|
*/
|
||||||
|
public class FencedException extends IOException {
|
||||||
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
|
public FencedException(String errorMsg) {
|
||||||
|
super(errorMsg);
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,48 @@
|
||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.hdfs.server.protocol;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Information that describes a journal
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
public class JournalInfo {
|
||||||
|
private final int layoutVersion;
|
||||||
|
private final String clusterId;
|
||||||
|
private final int namespaceId;
|
||||||
|
|
||||||
|
public JournalInfo(int lv, String clusterId, int nsId) {
|
||||||
|
this.layoutVersion = lv;
|
||||||
|
this.clusterId = clusterId;
|
||||||
|
this.namespaceId = nsId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getLayoutVersion() {
|
||||||
|
return layoutVersion;
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getClusterId() {
|
||||||
|
return clusterId;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getNamespaceId() {
|
||||||
|
return namespaceId;
|
||||||
|
}
|
||||||
|
}
|
|
@ -21,7 +21,6 @@ import java.io.IOException;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.ipc.VersionedProtocol;
|
|
||||||
import org.apache.hadoop.security.KerberosInfo;
|
import org.apache.hadoop.security.KerberosInfo;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -53,12 +52,15 @@ public interface JournalProtocol {
|
||||||
* via {@code EditLogBackupOutputStream} in order to synchronize meta-data
|
* via {@code EditLogBackupOutputStream} in order to synchronize meta-data
|
||||||
* changes with the backup namespace image.
|
* changes with the backup namespace image.
|
||||||
*
|
*
|
||||||
* @param registration active node registration
|
* @param journalInfo journal information
|
||||||
|
* @param epoch marks beginning a new journal writer
|
||||||
* @param firstTxnId the first transaction of this batch
|
* @param firstTxnId the first transaction of this batch
|
||||||
* @param numTxns number of transactions
|
* @param numTxns number of transactions
|
||||||
* @param records byte array containing serialized journal records
|
* @param records byte array containing serialized journal records
|
||||||
|
* @throws FencedException if the resource has been fenced
|
||||||
*/
|
*/
|
||||||
public void journal(NamenodeRegistration registration,
|
public void journal(JournalInfo journalInfo,
|
||||||
|
long epoch,
|
||||||
long firstTxnId,
|
long firstTxnId,
|
||||||
int numTxns,
|
int numTxns,
|
||||||
byte[] records) throws IOException;
|
byte[] records) throws IOException;
|
||||||
|
@ -66,9 +68,24 @@ public interface JournalProtocol {
|
||||||
/**
|
/**
|
||||||
* Notify the BackupNode that the NameNode has rolled its edit logs
|
* Notify the BackupNode that the NameNode has rolled its edit logs
|
||||||
* and is now writing a new log segment.
|
* and is now writing a new log segment.
|
||||||
* @param registration the registration of the active NameNode
|
* @param journalInfo journal information
|
||||||
|
* @param epoch marks beginning a new journal writer
|
||||||
* @param txid the first txid in the new log
|
* @param txid the first txid in the new log
|
||||||
|
* @throws FencedException if the resource has been fenced
|
||||||
*/
|
*/
|
||||||
public void startLogSegment(NamenodeRegistration registration,
|
public void startLogSegment(JournalInfo journalInfo, long epoch,
|
||||||
long txid) throws IOException;
|
long txid) throws IOException;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request to fence any other journal writers.
|
||||||
|
* Older writers with at previous epoch will be fenced and can no longer
|
||||||
|
* perform journal operations.
|
||||||
|
*
|
||||||
|
* @param journalInfo journal information
|
||||||
|
* @param epoch marks beginning a new journal writer
|
||||||
|
* @param fencerInfo info about fencer for debugging purposes
|
||||||
|
* @throws FencedException if the resource has been fenced
|
||||||
|
*/
|
||||||
|
public FenceResponse fence(JournalInfo journalInfo, long epoch,
|
||||||
|
String fencerInfo) throws IOException;
|
||||||
}
|
}
|
||||||
|
|
|
@ -36,16 +36,18 @@ message JournalInfoProto {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* JournalInfo - the information about the journal
|
* journalInfo - the information about the journal
|
||||||
* firstTxnId - the first txid in the journal records
|
* firstTxnId - the first txid in the journal records
|
||||||
* numTxns - Number of transactions in editlog
|
* numTxns - Number of transactions in editlog
|
||||||
* records - bytes containing serialized journal records
|
* records - bytes containing serialized journal records
|
||||||
|
* epoch - change to this represents change of journal writer
|
||||||
*/
|
*/
|
||||||
message JournalRequestProto {
|
message JournalRequestProto {
|
||||||
required JournalInfoProto journalInfo = 1;
|
required JournalInfoProto journalInfo = 1;
|
||||||
required uint64 firstTxnId = 2;
|
required uint64 firstTxnId = 2;
|
||||||
required uint32 numTxns = 3;
|
required uint32 numTxns = 3;
|
||||||
required bytes records = 4;
|
required bytes records = 4;
|
||||||
|
required uint64 epoch = 5;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -55,12 +57,13 @@ message JournalResponseProto {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* JournalInfo - the information about the journal
|
* journalInfo - the information about the journal
|
||||||
* txid - first txid in the new log
|
* txid - first txid in the new log
|
||||||
*/
|
*/
|
||||||
message StartLogSegmentRequestProto {
|
message StartLogSegmentRequestProto {
|
||||||
required JournalInfoProto journalInfo = 1;
|
required JournalInfoProto journalInfo = 1; // Info about the journal
|
||||||
required uint64 txid = 2;
|
required uint64 txid = 2; // Transaction ID
|
||||||
|
required uint64 epoch = 3;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -69,6 +72,27 @@ message StartLogSegmentRequestProto {
|
||||||
message StartLogSegmentResponseProto {
|
message StartLogSegmentResponseProto {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* journalInfo - the information about the journal
|
||||||
|
* txid - first txid in the new log
|
||||||
|
*/
|
||||||
|
message FenceRequestProto {
|
||||||
|
required JournalInfoProto journalInfo = 1; // Info about the journal
|
||||||
|
required uint64 epoch = 2; // Epoch - change indicates change in writer
|
||||||
|
optional string fencerInfo = 3; // Info about fencer for debugging
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* previousEpoch - previous epoch if any or zero
|
||||||
|
* lastTransactionId - last valid transaction Id in the journal
|
||||||
|
* inSync - if all journal segments are available and in sync
|
||||||
|
*/
|
||||||
|
message FenceResponseProto {
|
||||||
|
optional uint64 previousEpoch = 1;
|
||||||
|
optional uint64 lastTransactionId = 2;
|
||||||
|
optional bool inSync = 3;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Protocol used to journal edits to a remote node. Currently,
|
* Protocol used to journal edits to a remote node. Currently,
|
||||||
* this is used to publish edits from the NameNode to a BackupNode.
|
* this is used to publish edits from the NameNode to a BackupNode.
|
||||||
|
@ -89,4 +113,10 @@ service JournalProtocolService {
|
||||||
*/
|
*/
|
||||||
rpc startLogSegment(StartLogSegmentRequestProto)
|
rpc startLogSegment(StartLogSegmentRequestProto)
|
||||||
returns (StartLogSegmentResponseProto);
|
returns (StartLogSegmentResponseProto);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Request to fence a journal receiver.
|
||||||
|
*/
|
||||||
|
rpc fence(FenceRequestProto)
|
||||||
|
returns (FenceResponseProto);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue