HDFS-3211. Add fence(..) and replace NamenodeRegistration with JournalInfo and epoch in JournalProtocol. Contributed by suresh

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1310649 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-04-07 00:52:12 +00:00
parent 2ab67dce11
commit 74b4b45651
15 changed files with 400 additions and 82 deletions

View File

@ -62,9 +62,6 @@ Trunk (unreleased changes)
HDFS-3178. Add states and state handler for journal synchronization in HDFS-3178. Add states and state handler for journal synchronization in
JournalService. (szetszwo) JournalService. (szetszwo)
HDFS-3204. Minor modification to JournalProtocol.proto to make
it generic. (suresh)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream. HDFS-2834. Add a ByteBuffer-based read API to DFSInputStream.
@ -335,6 +332,15 @@ Release 2.0.0 - UNRELEASED
HDFS-3226. Allow GetConf tool to print arbitrary keys (todd) HDFS-3226. Allow GetConf tool to print arbitrary keys (todd)
HDFS-3204. Minor modification to JournalProtocol.proto to make
it generic. (suresh)
HDFS-2505. Add a test to verify getFileChecksum(..) with ViewFS. (Ravi
Prakash via szetszwo)
HDFS-3211. Add fence(..) and replace NamenodeRegistration with JournalInfo
and epoch in JournalProtocol. (suresh via szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
HDFS-3024. Improve performance of stringification in addStoredBlock (todd) HDFS-3024. Improve performance of stringification in addStoredBlock (todd)
@ -771,9 +777,6 @@ Release 0.23.3 - UNRELEASED
IMPROVEMENTS IMPROVEMENTS
HDFS-2505. Add a test to verify getFileChecksum(..) with ViewFS. (Ravi
Prakash via szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
/** /**
@ -33,6 +34,10 @@ import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
public class UnregisteredNodeException extends IOException { public class UnregisteredNodeException extends IOException {
private static final long serialVersionUID = -5620209396945970810L; private static final long serialVersionUID = -5620209396945970810L;
public UnregisteredNodeException(JournalInfo info) {
super("Unregistered server: " + info.toString());
}
public UnregisteredNodeException(NodeRegistration nodeReg) { public UnregisteredNodeException(NodeRegistration nodeReg) {
super("Unregistered server: " + nodeReg.toString()); super("Unregistered server: " + nodeReg.toString());
} }

View File

@ -20,10 +20,13 @@ package org.apache.hadoop.hdfs.protocolPB;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import com.google.protobuf.RpcController; import com.google.protobuf.RpcController;
@ -48,9 +51,8 @@ public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB
public JournalResponseProto journal(RpcController unused, public JournalResponseProto journal(RpcController unused,
JournalRequestProto req) throws ServiceException { JournalRequestProto req) throws ServiceException {
try { try {
impl.journal(PBHelper.convert(req.getJournalInfo()), impl.journal(PBHelper.convert(req.getJournalInfo()), req.getEpoch(),
req.getFirstTxnId(), req.getNumTxns(), req.getRecords() req.getFirstTxnId(), req.getNumTxns(), req.getRecords().toByteArray());
.toByteArray());
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
@ -63,10 +65,24 @@ public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB
StartLogSegmentRequestProto req) throws ServiceException { StartLogSegmentRequestProto req) throws ServiceException {
try { try {
impl.startLogSegment(PBHelper.convert(req.getJournalInfo()), impl.startLogSegment(PBHelper.convert(req.getJournalInfo()),
req.getTxid()); req.getEpoch(), req.getTxid());
} catch (IOException e) { } catch (IOException e) {
throw new ServiceException(e); throw new ServiceException(e);
} }
return StartLogSegmentResponseProto.newBuilder().build(); return StartLogSegmentResponseProto.newBuilder().build();
} }
@Override
public FenceResponseProto fence(RpcController controller,
FenceRequestProto req) throws ServiceException {
try {
FenceResponse resp = impl.fence(PBHelper.convert(req.getJournalInfo()), req.getEpoch(),
req.getFencerInfo());
return FenceResponseProto.newBuilder().setInSync(resp.isInSync())
.setLastTransactionId(resp.getLastTransactionId())
.setPreviousEpoch(resp.getPreviousEpoch()).build();
} catch (IOException e) {
throw new ServiceException(e);
}
}
} }

View File

@ -22,10 +22,13 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto;
import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtobufHelper;
import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.ProtocolMetaInterface;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
@ -58,10 +61,11 @@ public class JournalProtocolTranslatorPB implements ProtocolMetaInterface,
} }
@Override @Override
public void journal(NamenodeRegistration reg, long firstTxnId, public void journal(JournalInfo journalInfo, long epoch, long firstTxnId,
int numTxns, byte[] records) throws IOException { int numTxns, byte[] records) throws IOException {
JournalRequestProto req = JournalRequestProto.newBuilder() JournalRequestProto req = JournalRequestProto.newBuilder()
.setJournalInfo(PBHelper.convertToJournalInfo(reg)) .setJournalInfo(PBHelper.convert(journalInfo))
.setEpoch(epoch)
.setFirstTxnId(firstTxnId) .setFirstTxnId(firstTxnId)
.setNumTxns(numTxns) .setNumTxns(numTxns)
.setRecords(PBHelper.getByteString(records)) .setRecords(PBHelper.getByteString(records))
@ -74,10 +78,11 @@ public class JournalProtocolTranslatorPB implements ProtocolMetaInterface,
} }
@Override @Override
public void startLogSegment(NamenodeRegistration registration, long txid) public void startLogSegment(JournalInfo journalInfo, long epoch, long txid)
throws IOException { throws IOException {
StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder() StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder()
.setJournalInfo(PBHelper.convertToJournalInfo(registration)) .setJournalInfo(PBHelper.convert(journalInfo))
.setEpoch(epoch)
.setTxid(txid) .setTxid(txid)
.build(); .build();
try { try {
@ -86,6 +91,20 @@ public class JournalProtocolTranslatorPB implements ProtocolMetaInterface,
throw ProtobufHelper.getRemoteException(e); throw ProtobufHelper.getRemoteException(e);
} }
} }
@Override
public FenceResponse fence(JournalInfo journalInfo, long epoch,
String fencerInfo) throws IOException {
FenceRequestProto req = FenceRequestProto.newBuilder().setEpoch(epoch)
.setJournalInfo(PBHelper.convert(journalInfo)).build();
try {
FenceResponseProto resp = rpcProxy.fence(NULL_CONTROLLER, req);
return new FenceResponse(resp.getPreviousEpoch(),
resp.getLastTransactionId(), resp.getInSync());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override @Override
public boolean isMethodSupported(String methodName) throws IOException { public boolean isMethodSupported(String methodName) throws IOException {

View File

@ -110,6 +110,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand;
import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol;
@ -117,6 +118,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@ -127,7 +129,6 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand;
import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.EnumSetWritable;
import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Text;
@ -1347,25 +1348,19 @@ public class PBHelper {
.setStorageID(r.getStorageID()).build(); .setStorageID(r.getStorageID()).build();
} }
public static NamenodeRegistration convert(JournalInfoProto info) { public static JournalInfo convert(JournalInfoProto info) {
int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0; int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0; int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;
StorageInfo storage = new StorageInfo(lv, nsID, info.getClusterID(), 0); return new JournalInfo(lv, info.getClusterID(), nsID);
// Note that the role is always {@link NamenodeRole#NAMENODE} as this
// conversion happens for messages from Namenode to Journal receivers.
// Addresses in the registration are unused.
return new NamenodeRegistration("", "", storage, NamenodeRole.NAMENODE);
} }
/** /**
* Method used for converting {@link JournalInfoProto} sent from Namenode * Method used for converting {@link JournalInfoProto} sent from Namenode
* to Journal receivers to {@link NamenodeRegistration}. * to Journal receivers to {@link NamenodeRegistration}.
*/ */
public static JournalInfoProto convertToJournalInfo( public static JournalInfoProto convert(JournalInfo j) {
NamenodeRegistration reg) { return JournalInfoProto.newBuilder().setClusterID(j.getClusterId())
return JournalInfoProto.newBuilder().setClusterID(reg.getClusterID()) .setLayoutVersion(j.getLayoutVersion())
.setLayoutVersion(reg.getLayoutVersion()) .setNamespaceID(j.getNamespaceId()).build();
.setNamespaceID(reg.getNamespaceID()).build();
} }
} }

View File

@ -31,6 +31,9 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.StorageInfo; import org.apache.hadoop.hdfs.server.common.StorageInfo;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.FencedException;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@ -40,6 +43,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import com.google.common.annotations.VisibleForTesting;
import com.google.protobuf.BlockingService; import com.google.protobuf.BlockingService;
/** /**
@ -66,6 +70,8 @@ public class JournalService implements JournalProtocol {
private final NamenodeProtocol namenode; private final NamenodeProtocol namenode;
private final StateHandler stateHandler = new StateHandler(); private final StateHandler stateHandler = new StateHandler();
private final RPC.Server rpcServer; private final RPC.Server rpcServer;
private long epoch = 0;
private String fencerInfo;
enum State { enum State {
/** The service is initialized and ready to start. */ /** The service is initialized and ready to start. */
@ -115,7 +121,7 @@ public class JournalService implements JournalProtocol {
current = State.WAITING_FOR_ROLL; current = State.WAITING_FOR_ROLL;
} }
synchronized void startLogSegment() throws IOException { synchronized void startLogSegment() {
if (current == State.WAITING_FOR_ROLL) { if (current == State.WAITING_FOR_ROLL) {
current = State.SYNCING; current = State.SYNCING;
} }
@ -232,28 +238,42 @@ public class JournalService implements JournalProtocol {
} }
@Override @Override
public void journal(NamenodeRegistration registration, long firstTxnId, public void journal(JournalInfo journalInfo, long epoch, long firstTxnId,
int numTxns, byte[] records) throws IOException { int numTxns, byte[] records) throws IOException {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Received journal " + firstTxnId + " " + numTxns); LOG.trace("Received journal " + firstTxnId + " " + numTxns);
} }
stateHandler.isJournalAllowed(); stateHandler.isJournalAllowed();
verify(registration); verify(epoch, journalInfo);
listener.journal(this, firstTxnId, numTxns, records); listener.journal(this, firstTxnId, numTxns, records);
} }
@Override @Override
public void startLogSegment(NamenodeRegistration registration, long txid) public void startLogSegment(JournalInfo journalInfo, long epoch, long txid)
throws IOException { throws IOException {
if (LOG.isTraceEnabled()) { if (LOG.isTraceEnabled()) {
LOG.trace("Received startLogSegment " + txid); LOG.trace("Received startLogSegment " + txid);
} }
stateHandler.isStartLogSegmentAllowed(); stateHandler.isStartLogSegmentAllowed();
verify(registration); verify(epoch, journalInfo);
listener.rollLogs(this, txid); listener.rollLogs(this, txid);
stateHandler.startLogSegment(); stateHandler.startLogSegment();
} }
@Override
public FenceResponse fence(JournalInfo journalInfo, long epoch,
String fencerInfo) throws IOException {
LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
verifyFence(epoch, fencerInfo);
verify(journalInfo);
long previousEpoch = epoch;
this.epoch = epoch;
this.fencerInfo = fencerInfo;
// TODO:HDFS-3092 set lastTransId and inSync
return new FenceResponse(previousEpoch, 0, false);
}
/** Create an RPC server. */ /** Create an RPC server. */
private static RPC.Server createRpcServer(Configuration conf, private static RPC.Server createRpcServer(Configuration conf,
InetSocketAddress address, JournalProtocol impl) throws IOException { InetSocketAddress address, JournalProtocol impl) throws IOException {
@ -267,15 +287,54 @@ public class JournalService implements JournalProtocol {
address.getHostName(), address.getPort(), 1, false, conf, null); address.getHostName(), address.getPort(), 1, false, conf, null);
} }
private void verify(NamenodeRegistration reg) throws IOException { private void verifyEpoch(long e) throws FencedException {
if (!registration.getRegistrationID().equals(reg.getRegistrationID())) { if (epoch != e) {
LOG.warn("Invalid registrationID - expected: " String errorMsg = "Epoch " + e + " is not valid. "
+ registration.getRegistrationID() + " received: " + "Resource has already been fenced by " + fencerInfo
+ reg.getRegistrationID()); + " with epoch " + epoch;
throw new UnregisteredNodeException(reg); LOG.warn(errorMsg);
throw new FencedException(errorMsg);
} }
} }
private void verifyFence(long e, String fencer) throws FencedException {
if (e <= epoch) {
String errorMsg = "Epoch " + e + " from fencer " + fencer
+ " is not valid. " + "Resource has already been fenced by "
+ fencerInfo + " with epoch " + epoch;
LOG.warn(errorMsg);
throw new FencedException(errorMsg);
}
}
/**
* Verifies a journal request
*/
private void verify(JournalInfo journalInfo) throws IOException {
String errorMsg = null;
int expectedNamespaceID = registration.getNamespaceID();
if (journalInfo.getNamespaceId() != expectedNamespaceID) {
errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
+ " actual " + journalInfo.getNamespaceId();
LOG.warn(errorMsg);
throw new UnregisteredNodeException(journalInfo);
}
if (!journalInfo.getClusterId().equals(registration.getClusterID())) {
errorMsg = "Invalid clusterId in journal request - expected "
+ journalInfo.getClusterId() + " actual " + registration.getClusterID();
LOG.warn(errorMsg);
throw new UnregisteredNodeException(journalInfo);
}
}
/**
* Verifies a journal request
*/
private void verify(long e, JournalInfo journalInfo) throws IOException {
verifyEpoch(e);
verify(journalInfo);
}
/** /**
* Register this service with the active namenode. * Register this service with the active namenode.
*/ */
@ -298,4 +357,9 @@ public class JournalService implements JournalProtocol {
listener.verifyVersion(this, nsInfo); listener.verifyVersion(this, nsInfo);
registration.setStorageInfo(nsInfo); registration.setStorageInfo(nsInfo);
} }
}
@VisibleForTesting
long getEpoch() {
return epoch;
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode;
import java.io.IOException; import java.io.IOException;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
/** /**
@ -26,19 +27,20 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
* to a BackupNode. * to a BackupNode.
*/ */
class BackupJournalManager implements JournalManager { class BackupJournalManager implements JournalManager {
private final NamenodeRegistration nnReg;
private final NamenodeRegistration bnReg; private final NamenodeRegistration bnReg;
private final JournalInfo journalInfo;
BackupJournalManager(NamenodeRegistration bnReg, BackupJournalManager(NamenodeRegistration bnReg,
NamenodeRegistration nnReg) { NamenodeRegistration nnReg) {
journalInfo = new JournalInfo(nnReg.getLayoutVersion(),
nnReg.getClusterID(), nnReg.getNamespaceID());
this.bnReg = bnReg; this.bnReg = bnReg;
this.nnReg = nnReg;
} }
@Override @Override
public EditLogOutputStream startLogSegment(long txId) throws IOException { public EditLogOutputStream startLogSegment(long txId) throws IOException {
EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, nnReg); EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg,
journalInfo);
stm.startLogSegment(txId); stm.startLogSegment(txId);
return stm; return stm;
} }

View File

@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB;
import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
@ -217,7 +219,8 @@ public class BackupNode extends NameNode {
} }
/* @Override */// NameNode /* @Override */// NameNode
public boolean setSafeMode(SafeModeAction action) throws IOException { public boolean setSafeMode(@SuppressWarnings("unused") SafeModeAction action)
throws IOException {
throw new UnsupportedActionException("setSafeMode"); throw new UnsupportedActionException("setSafeMode");
} }
@ -236,51 +239,56 @@ public class BackupNode extends NameNode {
/** /**
* Verifies a journal request * Verifies a journal request
* @param nodeReg node registration
* @throws UnregisteredNodeException if the registration is invalid
*/ */
void verifyJournalRequest(NamenodeRegistration reg) throws IOException { private void verifyJournalRequest(JournalInfo journalInfo)
verifyVersion(reg.getLayoutVersion()); throws IOException {
verifyVersion(journalInfo.getLayoutVersion());
String errorMsg = null; String errorMsg = null;
int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID(); int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID();
if (reg.getNamespaceID() != expectedNamespaceID) { if (journalInfo.getNamespaceId() != expectedNamespaceID) {
errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID
+ " actual " + reg.getNamespaceID(); + " actual " + journalInfo.getNamespaceId();
LOG.warn(errorMsg); LOG.warn(errorMsg);
throw new UnregisteredNodeException(reg); throw new UnregisteredNodeException(journalInfo);
} }
if (!reg.getClusterID().equals(namesystem.getClusterId())) { if (!journalInfo.getClusterId().equals(namesystem.getClusterId())) {
errorMsg = "Invalid clusterId in journal request - expected " errorMsg = "Invalid clusterId in journal request - expected "
+ reg.getClusterID() + " actual " + namesystem.getClusterId(); + journalInfo.getClusterId() + " actual " + namesystem.getClusterId();
LOG.warn(errorMsg); LOG.warn(errorMsg);
throw new UnregisteredNodeException(reg); throw new UnregisteredNodeException(journalInfo);
} }
} }
///////////////////////////////////////////////////// /////////////////////////////////////////////////////
// BackupNodeProtocol implementation for backup node. // BackupNodeProtocol implementation for backup node.
///////////////////////////////////////////////////// /////////////////////////////////////////////////////
@Override @Override
public void startLogSegment(NamenodeRegistration registration, long txid) public void startLogSegment(JournalInfo journalInfo, long epoch,
throws IOException { long txid) throws IOException {
namesystem.checkOperation(OperationCategory.JOURNAL); namesystem.checkOperation(OperationCategory.JOURNAL);
verifyJournalRequest(registration); verifyJournalRequest(journalInfo);
getBNImage().namenodeStartedLogSegment(txid); getBNImage().namenodeStartedLogSegment(txid);
} }
@Override @Override
public void journal(NamenodeRegistration nnReg, public void journal(JournalInfo journalInfo, long epoch, long firstTxId,
long firstTxId, int numTxns, int numTxns, byte[] records) throws IOException {
byte[] records) throws IOException {
namesystem.checkOperation(OperationCategory.JOURNAL); namesystem.checkOperation(OperationCategory.JOURNAL);
verifyJournalRequest(nnReg); verifyJournalRequest(journalInfo);
getBNImage().journal(firstTxId, numTxns, records); getBNImage().journal(firstTxId, numTxns, records);
} }
private BackupImage getBNImage() { private BackupImage getBNImage() {
return (BackupImage)nn.getFSImage(); return (BackupImage)nn.getFSImage();
} }
@Override
public FenceResponse fence(JournalInfo journalInfo, long epoch,
String fencerInfo) throws IOException {
LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch);
throw new UnsupportedOperationException(
"BackupNode does not support fence");
}
} }
////////////////////////////////////////////////////// //////////////////////////////////////////////////////

View File

@ -24,6 +24,7 @@ import java.util.Arrays;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.NameNodeProxies;
import org.apache.hadoop.hdfs.server.common.Storage; import org.apache.hadoop.hdfs.server.common.Storage;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.DataOutputBuffer;
@ -42,18 +43,18 @@ import org.apache.hadoop.security.UserGroupInformation;
class EditLogBackupOutputStream extends EditLogOutputStream { class EditLogBackupOutputStream extends EditLogOutputStream {
static int DEFAULT_BUFFER_SIZE = 256; static int DEFAULT_BUFFER_SIZE = 256;
private JournalProtocol backupNode; // RPC proxy to backup node private final JournalProtocol backupNode; // RPC proxy to backup node
private NamenodeRegistration bnRegistration; // backup node registration private final NamenodeRegistration bnRegistration; // backup node registration
private NamenodeRegistration nnRegistration; // active node registration private final JournalInfo journalInfo; // active node registration
private final DataOutputBuffer out; // serialized output sent to backup node
private EditsDoubleBuffer doubleBuf; private EditsDoubleBuffer doubleBuf;
private DataOutputBuffer out; // serialized output sent to backup node
EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node
NamenodeRegistration nnReg) // active name-node JournalInfo journalInfo) // active name-node
throws IOException { throws IOException {
super(); super();
this.bnRegistration = bnReg; this.bnRegistration = bnReg;
this.nnRegistration = nnReg; this.journalInfo = journalInfo;
InetSocketAddress bnAddress = InetSocketAddress bnAddress =
NetUtils.createSocketAddr(bnRegistration.getAddress()); NetUtils.createSocketAddr(bnRegistration.getAddress());
try { try {
@ -127,8 +128,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
out.reset(); out.reset();
assert out.getLength() == 0 : "Output buffer is not empty"; assert out.getLength() == 0 : "Output buffer is not empty";
backupNode.journal(nnRegistration, backupNode.journal(journalInfo, 0, firstTxToFlush, numReadyTxns, data);
firstTxToFlush, numReadyTxns, data);
} }
} }
@ -140,6 +140,6 @@ class EditLogBackupOutputStream extends EditLogOutputStream {
} }
void startLogSegment(long txId) throws IOException { void startLogSegment(long txId) throws IOException {
backupNode.startLogSegment(nnRegistration, txId); backupNode.startLogSegment(journalInfo, 0, txId);
} }
} }

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Response to a journal fence request. See {@link JournalProtocol#fence}
*/
@InterfaceAudience.Private
public class FenceResponse {
private final long previousEpoch;
private final long lastTransactionId;
private final boolean isInSync;
public FenceResponse(long previousEpoch, long lastTransId, boolean inSync) {
this.previousEpoch = previousEpoch;
this.lastTransactionId = lastTransId;
this.isInSync = inSync;
}
public boolean isInSync() {
return isInSync;
}
public long getLastTransactionId() {
return lastTransactionId;
}
public long getPreviousEpoch() {
return previousEpoch;
}
}

View File

@ -0,0 +1,32 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import java.io.IOException;
/**
* If a previous user of a resource tries to use a shared resource, after
* fenced by another user, this exception is thrown.
*/
public class FencedException extends IOException {
private static final long serialVersionUID = 1L;
public FencedException(String errorMsg) {
super(errorMsg);
}
}

View File

@ -0,0 +1,48 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Information that describes a journal
*/
@InterfaceAudience.Private
public class JournalInfo {
private final int layoutVersion;
private final String clusterId;
private final int namespaceId;
public JournalInfo(int lv, String clusterId, int nsId) {
this.layoutVersion = lv;
this.clusterId = clusterId;
this.namespaceId = nsId;
}
public int getLayoutVersion() {
return layoutVersion;
}
public String getClusterId() {
return clusterId;
}
public int getNamespaceId() {
return namespaceId;
}
}

View File

@ -21,7 +21,6 @@ import java.io.IOException;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.ipc.VersionedProtocol;
import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.KerberosInfo;
/** /**
@ -53,12 +52,15 @@ public interface JournalProtocol {
* via {@code EditLogBackupOutputStream} in order to synchronize meta-data * via {@code EditLogBackupOutputStream} in order to synchronize meta-data
* changes with the backup namespace image. * changes with the backup namespace image.
* *
* @param registration active node registration * @param journalInfo journal information
* @param epoch marks beginning a new journal writer
* @param firstTxnId the first transaction of this batch * @param firstTxnId the first transaction of this batch
* @param numTxns number of transactions * @param numTxns number of transactions
* @param records byte array containing serialized journal records * @param records byte array containing serialized journal records
* @throws FencedException if the resource has been fenced
*/ */
public void journal(NamenodeRegistration registration, public void journal(JournalInfo journalInfo,
long epoch,
long firstTxnId, long firstTxnId,
int numTxns, int numTxns,
byte[] records) throws IOException; byte[] records) throws IOException;
@ -66,9 +68,24 @@ public interface JournalProtocol {
/** /**
* Notify the BackupNode that the NameNode has rolled its edit logs * Notify the BackupNode that the NameNode has rolled its edit logs
* and is now writing a new log segment. * and is now writing a new log segment.
* @param registration the registration of the active NameNode * @param journalInfo journal information
* @param epoch marks beginning a new journal writer
* @param txid the first txid in the new log * @param txid the first txid in the new log
* @throws FencedException if the resource has been fenced
*/ */
public void startLogSegment(NamenodeRegistration registration, public void startLogSegment(JournalInfo journalInfo, long epoch,
long txid) throws IOException; long txid) throws IOException;
/**
* Request to fence any other journal writers.
* Older writers with at previous epoch will be fenced and can no longer
* perform journal operations.
*
* @param journalInfo journal information
* @param epoch marks beginning a new journal writer
* @param fencerInfo info about fencer for debugging purposes
* @throws FencedException if the resource has been fenced
*/
public FenceResponse fence(JournalInfo journalInfo, long epoch,
String fencerInfo) throws IOException;
} }

View File

@ -36,16 +36,18 @@ message JournalInfoProto {
} }
/** /**
* JournalInfo - the information about the journal * journalInfo - the information about the journal
* firstTxnId - the first txid in the journal records * firstTxnId - the first txid in the journal records
* numTxns - Number of transactions in editlog * numTxns - Number of transactions in editlog
* records - bytes containing serialized journal records * records - bytes containing serialized journal records
* epoch - change to this represents change of journal writer
*/ */
message JournalRequestProto { message JournalRequestProto {
required JournalInfoProto journalInfo = 1; required JournalInfoProto journalInfo = 1;
required uint64 firstTxnId = 2; required uint64 firstTxnId = 2;
required uint32 numTxns = 3; required uint32 numTxns = 3;
required bytes records = 4; required bytes records = 4;
required uint64 epoch = 5;
} }
/** /**
@ -55,12 +57,13 @@ message JournalResponseProto {
} }
/** /**
* JournalInfo - the information about the journal * journalInfo - the information about the journal
* txid - first txid in the new log * txid - first txid in the new log
*/ */
message StartLogSegmentRequestProto { message StartLogSegmentRequestProto {
required JournalInfoProto journalInfo = 1; required JournalInfoProto journalInfo = 1; // Info about the journal
required uint64 txid = 2; required uint64 txid = 2; // Transaction ID
required uint64 epoch = 3;
} }
/** /**
@ -69,6 +72,27 @@ message StartLogSegmentRequestProto {
message StartLogSegmentResponseProto { message StartLogSegmentResponseProto {
} }
/**
* journalInfo - the information about the journal
* txid - first txid in the new log
*/
message FenceRequestProto {
required JournalInfoProto journalInfo = 1; // Info about the journal
required uint64 epoch = 2; // Epoch - change indicates change in writer
optional string fencerInfo = 3; // Info about fencer for debugging
}
/**
* previousEpoch - previous epoch if any or zero
* lastTransactionId - last valid transaction Id in the journal
* inSync - if all journal segments are available and in sync
*/
message FenceResponseProto {
optional uint64 previousEpoch = 1;
optional uint64 lastTransactionId = 2;
optional bool inSync = 3;
}
/** /**
* Protocol used to journal edits to a remote node. Currently, * Protocol used to journal edits to a remote node. Currently,
* this is used to publish edits from the NameNode to a BackupNode. * this is used to publish edits from the NameNode to a BackupNode.
@ -89,4 +113,10 @@ service JournalProtocolService {
*/ */
rpc startLogSegment(StartLogSegmentRequestProto) rpc startLogSegment(StartLogSegmentRequestProto)
returns (StartLogSegmentResponseProto); returns (StartLogSegmentResponseProto);
/**
* Request to fence a journal receiver.
*/
rpc fence(FenceRequestProto)
returns (FenceResponseProto);
} }

View File

@ -20,12 +20,18 @@ package org.apache.hadoop.hdfs.server.journalservice;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileSystemTestHelper; import org.apache.hadoop.fs.FileSystemTestHelper;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.FenceResponse;
import org.apache.hadoop.hdfs.server.protocol.FencedException;
import org.apache.hadoop.hdfs.server.protocol.JournalInfo;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito; import org.mockito.Mockito;
@ -42,7 +48,7 @@ public class TestJournalService {
* called. * called.
*/ */
@Test @Test
public void testCallBacks() throws IOException { public void testCallBacks() throws Exception {
JournalListener listener = Mockito.mock(JournalListener.class); JournalListener listener = Mockito.mock(JournalListener.class);
JournalService service = null; JournalService service = null;
try { try {
@ -51,6 +57,7 @@ public class TestJournalService {
service = startJournalService(listener); service = startJournalService(listener);
verifyRollLogsCallback(service, listener); verifyRollLogsCallback(service, listener);
verifyJournalCallback(service, listener); verifyJournalCallback(service, listener);
verifyFence(service, cluster.getNameNode(0));
} finally { } finally {
if (service != null) { if (service != null) {
service.stop(); service.stop();
@ -93,4 +100,28 @@ public class TestJournalService {
Mockito.verify(l, Mockito.atLeastOnce()).journal(Mockito.eq(s), Mockito.verify(l, Mockito.atLeastOnce()).journal(Mockito.eq(s),
Mockito.anyLong(), Mockito.anyInt(), (byte[]) Mockito.any()); Mockito.anyLong(), Mockito.anyInt(), (byte[]) Mockito.any());
} }
public void verifyFence(JournalService s, NameNode nn) throws Exception {
String cid = nn.getNamesystem().getClusterId();
int nsId = nn.getNamesystem().getFSImage().getNamespaceID();
int lv = nn.getNamesystem().getFSImage().getLayoutVersion();
// Fence the journal service
JournalInfo info = new JournalInfo(lv, cid, nsId);
long currentEpoch = s.getEpoch();
// New epoch lower than the current epoch is rejected
try {
s.fence(info, (currentEpoch - 1), "fencer");
} catch (FencedException ignore) { /* Ignored */ }
// New epoch equal to the current epoch is rejected
try {
s.fence(info, currentEpoch, "fencer");
} catch (FencedException ignore) { /* Ignored */ }
// New epoch higher than the current epoch is successful
FenceResponse resp = s.fence(info, currentEpoch+1, "fencer");
Assert.assertNotNull(resp);
}
} }