From c0795cd10713e5e0bdc4f87b129ea91fde8bc033 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Sat, 12 May 2012 01:04:44 +0000 Subject: [PATCH] svn merge -c 1310649 from trunk for HDFS-3211. Add fence(..) and replace NamenodeRegistration with JournalInfo and epoch in JournalProtocol. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1337423 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 ++ .../protocol/UnregisteredNodeException.java | 5 ++ ...JournalProtocolServerSideTranslatorPB.java | 24 ++++++++-- .../JournalProtocolTranslatorPB.java | 29 +++++++++-- .../hadoop/hdfs/protocolPB/PBHelper.java | 21 ++++---- .../server/namenode/BackupJournalManager.java | 10 ++-- .../hdfs/server/namenode/BackupNode.java | 46 ++++++++++-------- .../namenode/EditLogBackupOutputStream.java | 18 +++---- .../hdfs/server/protocol/FenceResponse.java | 48 +++++++++++++++++++ .../hdfs/server/protocol/FencedException.java | 32 +++++++++++++ .../hdfs/server/protocol/JournalInfo.java | 48 +++++++++++++++++++ .../hdfs/server/protocol/JournalProtocol.java | 27 +++++++++-- .../src/main/proto/JournalProtocol.proto | 38 +++++++++++++-- 13 files changed, 286 insertions(+), 63 deletions(-) create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java create mode 100644 hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index c0ce1d44f56..95091e6a6f1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -315,6 +315,9 @@ Release 2.0.0 - UNRELEASED HDFS-3400. DNs should be able start with jsvc even if security is disabled. (atm via eli) + HDFS-3211. Add fence(..) and replace NamenodeRegistration with JournalInfo + and epoch in JournalProtocol. (suresh via szetszwo) + OPTIMIZATIONS HDFS-2477. Optimize computing the diff between a block report and the diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java index 941a320a795..eabdd22a974 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocol/UnregisteredNodeException.java @@ -22,6 +22,7 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; /** @@ -33,6 +34,10 @@ import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; public class UnregisteredNodeException extends IOException { private static final long serialVersionUID = -5620209396945970810L; + public UnregisteredNodeException(JournalInfo info) { + super("Unregistered server: " + info.toString()); + } + public UnregisteredNodeException(NodeRegistration nodeReg) { super("Unregistered server: " + nodeReg.toString()); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java index 1858e70980d..1805d146640 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolServerSideTranslatorPB.java @@ -20,10 +20,13 @@ package org.apache.hadoop.hdfs.protocolPB; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalResponseProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentResponseProto; +import org.apache.hadoop.hdfs.server.protocol.FenceResponse; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import com.google.protobuf.RpcController; @@ -48,9 +51,8 @@ public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB public JournalResponseProto journal(RpcController unused, JournalRequestProto req) throws ServiceException { try { - impl.journal(PBHelper.convert(req.getJournalInfo()), - req.getFirstTxnId(), req.getNumTxns(), req.getRecords() - .toByteArray()); + impl.journal(PBHelper.convert(req.getJournalInfo()), req.getEpoch(), + req.getFirstTxnId(), req.getNumTxns(), req.getRecords().toByteArray()); } catch (IOException e) { throw new ServiceException(e); } @@ -63,10 +65,24 @@ public class JournalProtocolServerSideTranslatorPB implements JournalProtocolPB StartLogSegmentRequestProto req) throws ServiceException { try { impl.startLogSegment(PBHelper.convert(req.getJournalInfo()), - req.getTxid()); + req.getEpoch(), req.getTxid()); } catch (IOException e) { throw new ServiceException(e); } return StartLogSegmentResponseProto.newBuilder().build(); } + + @Override + public FenceResponseProto fence(RpcController controller, + FenceRequestProto req) throws ServiceException { + try { + FenceResponse resp = impl.fence(PBHelper.convert(req.getJournalInfo()), req.getEpoch(), + req.getFencerInfo()); + return FenceResponseProto.newBuilder().setInSync(resp.isInSync()) + .setLastTransactionId(resp.getLastTransactionId()) + .setPreviousEpoch(resp.getPreviousEpoch()).build(); + } catch (IOException e) { + throw new ServiceException(e); + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java index 9258180e52a..d14e4e22fe0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/JournalProtocolTranslatorPB.java @@ -22,10 +22,13 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceRequestProto; +import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.FenceResponseProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.JournalRequestProto; import org.apache.hadoop.hdfs.protocol.proto.JournalProtocolProtos.StartLogSegmentRequestProto; +import org.apache.hadoop.hdfs.server.protocol.FenceResponse; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; -import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.ipc.ProtobufHelper; import org.apache.hadoop.ipc.ProtocolMetaInterface; import org.apache.hadoop.ipc.RPC; @@ -58,10 +61,11 @@ public class JournalProtocolTranslatorPB implements ProtocolMetaInterface, } @Override - public void journal(NamenodeRegistration reg, long firstTxnId, + public void journal(JournalInfo journalInfo, long epoch, long firstTxnId, int numTxns, byte[] records) throws IOException { JournalRequestProto req = JournalRequestProto.newBuilder() - .setJournalInfo(PBHelper.convertToJournalInfo(reg)) + .setJournalInfo(PBHelper.convert(journalInfo)) + .setEpoch(epoch) .setFirstTxnId(firstTxnId) .setNumTxns(numTxns) .setRecords(PBHelper.getByteString(records)) @@ -74,10 +78,11 @@ public class JournalProtocolTranslatorPB implements ProtocolMetaInterface, } @Override - public void startLogSegment(NamenodeRegistration registration, long txid) + public void startLogSegment(JournalInfo journalInfo, long epoch, long txid) throws IOException { StartLogSegmentRequestProto req = StartLogSegmentRequestProto.newBuilder() - .setJournalInfo(PBHelper.convertToJournalInfo(registration)) + .setJournalInfo(PBHelper.convert(journalInfo)) + .setEpoch(epoch) .setTxid(txid) .build(); try { @@ -86,6 +91,20 @@ public class JournalProtocolTranslatorPB implements ProtocolMetaInterface, throw ProtobufHelper.getRemoteException(e); } } + + @Override + public FenceResponse fence(JournalInfo journalInfo, long epoch, + String fencerInfo) throws IOException { + FenceRequestProto req = FenceRequestProto.newBuilder().setEpoch(epoch) + .setJournalInfo(PBHelper.convert(journalInfo)).build(); + try { + FenceResponseProto resp = rpcProxy.fence(NULL_CONTROLLER, req); + return new FenceResponse(resp.getPreviousEpoch(), + resp.getLastTransactionId(), resp.getInSync()); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } @Override public boolean isMethodSupported(String methodName) throws IOException { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index 72f37ee5e70..92b7858c007 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -110,6 +110,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlockCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand; import org.apache.hadoop.hdfs.server.protocol.BlockRecoveryCommand.RecoveringBlock; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; +import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.CheckpointCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; @@ -117,6 +118,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.FinalizeCommand; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; @@ -127,7 +129,6 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; -import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.UpgradeCommand; import org.apache.hadoop.io.EnumSetWritable; import org.apache.hadoop.io.Text; @@ -1349,25 +1350,19 @@ public class PBHelper { .setStorageID(r.getStorageID()).build(); } - public static NamenodeRegistration convert(JournalInfoProto info) { + public static JournalInfo convert(JournalInfoProto info) { int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0; int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0; - StorageInfo storage = new StorageInfo(lv, nsID, info.getClusterID(), 0); - - // Note that the role is always {@link NamenodeRole#NAMENODE} as this - // conversion happens for messages from Namenode to Journal receivers. - // Addresses in the registration are unused. - return new NamenodeRegistration("", "", storage, NamenodeRole.NAMENODE); + return new JournalInfo(lv, info.getClusterID(), nsID); } /** * Method used for converting {@link JournalInfoProto} sent from Namenode * to Journal receivers to {@link NamenodeRegistration}. */ - public static JournalInfoProto convertToJournalInfo( - NamenodeRegistration reg) { - return JournalInfoProto.newBuilder().setClusterID(reg.getClusterID()) - .setLayoutVersion(reg.getLayoutVersion()) - .setNamespaceID(reg.getNamespaceID()).build(); + public static JournalInfoProto convert(JournalInfo j) { + return JournalInfoProto.newBuilder().setClusterID(j.getClusterId()) + .setLayoutVersion(j.getLayoutVersion()) + .setNamespaceID(j.getNamespaceId()).build(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java index de75b769345..ebf4f480f35 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupJournalManager.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hdfs.server.namenode; import java.io.IOException; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; /** @@ -26,19 +27,20 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; * to a BackupNode. */ class BackupJournalManager implements JournalManager { - - private final NamenodeRegistration nnReg; private final NamenodeRegistration bnReg; + private final JournalInfo journalInfo; BackupJournalManager(NamenodeRegistration bnReg, NamenodeRegistration nnReg) { + journalInfo = new JournalInfo(nnReg.getLayoutVersion(), + nnReg.getClusterID(), nnReg.getNamespaceID()); this.bnReg = bnReg; - this.nnReg = nnReg; } @Override public EditLogOutputStream startLogSegment(long txId) throws IOException { - EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, nnReg); + EditLogBackupOutputStream stm = new EditLogBackupOutputStream(bnReg, + journalInfo); stm.startLogSegment(txId); return stm; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java index 47b577d007e..699b1eef5c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/BackupNode.java @@ -35,6 +35,8 @@ import org.apache.hadoop.hdfs.protocolPB.JournalProtocolPB; import org.apache.hadoop.hdfs.protocolPB.JournalProtocolServerSideTranslatorPB; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.protocol.FenceResponse; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; @@ -207,7 +209,8 @@ public class BackupNode extends NameNode { } /* @Override */// NameNode - public boolean setSafeMode(SafeModeAction action) throws IOException { + public boolean setSafeMode(@SuppressWarnings("unused") SafeModeAction action) + throws IOException { throw new UnsupportedActionException("setSafeMode"); } @@ -226,51 +229,56 @@ public class BackupNode extends NameNode { /** * Verifies a journal request - * @param nodeReg node registration - * @throws UnregisteredNodeException if the registration is invalid */ - void verifyJournalRequest(NamenodeRegistration reg) throws IOException { - verifyLayoutVersion(reg.getLayoutVersion()); + private void verifyJournalRequest(JournalInfo journalInfo) + throws IOException { + verifyLayoutVersion(journalInfo.getLayoutVersion()); String errorMsg = null; int expectedNamespaceID = namesystem.getNamespaceInfo().getNamespaceID(); - if (reg.getNamespaceID() != expectedNamespaceID) { + if (journalInfo.getNamespaceId() != expectedNamespaceID) { errorMsg = "Invalid namespaceID in journal request - expected " + expectedNamespaceID - + " actual " + reg.getNamespaceID(); + + " actual " + journalInfo.getNamespaceId(); LOG.warn(errorMsg); - throw new UnregisteredNodeException(reg); + throw new UnregisteredNodeException(journalInfo); } - if (!reg.getClusterID().equals(namesystem.getClusterId())) { + if (!journalInfo.getClusterId().equals(namesystem.getClusterId())) { errorMsg = "Invalid clusterId in journal request - expected " - + reg.getClusterID() + " actual " + namesystem.getClusterId(); + + journalInfo.getClusterId() + " actual " + namesystem.getClusterId(); LOG.warn(errorMsg); - throw new UnregisteredNodeException(reg); + throw new UnregisteredNodeException(journalInfo); } } - ///////////////////////////////////////////////////// // BackupNodeProtocol implementation for backup node. ///////////////////////////////////////////////////// @Override - public void startLogSegment(NamenodeRegistration registration, long txid) - throws IOException { + public void startLogSegment(JournalInfo journalInfo, long epoch, + long txid) throws IOException { namesystem.checkOperation(OperationCategory.JOURNAL); - verifyJournalRequest(registration); + verifyJournalRequest(journalInfo); getBNImage().namenodeStartedLogSegment(txid); } @Override - public void journal(NamenodeRegistration nnReg, - long firstTxId, int numTxns, - byte[] records) throws IOException { + public void journal(JournalInfo journalInfo, long epoch, long firstTxId, + int numTxns, byte[] records) throws IOException { namesystem.checkOperation(OperationCategory.JOURNAL); - verifyJournalRequest(nnReg); + verifyJournalRequest(journalInfo); getBNImage().journal(firstTxId, numTxns, records); } private BackupImage getBNImage() { return (BackupImage)nn.getFSImage(); } + + @Override + public FenceResponse fence(JournalInfo journalInfo, long epoch, + String fencerInfo) throws IOException { + LOG.info("Fenced by " + fencerInfo + " with epoch " + epoch); + throw new UnsupportedOperationException( + "BackupNode does not support fence"); + } } ////////////////////////////////////////////////////// diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java index bdb4c5e7732..5a28f7c512d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/EditLogBackupOutputStream.java @@ -24,6 +24,7 @@ import java.util.Arrays; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.NameNodeProxies; import org.apache.hadoop.hdfs.server.common.Storage; +import org.apache.hadoop.hdfs.server.protocol.JournalInfo; import org.apache.hadoop.hdfs.server.protocol.JournalProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.io.DataOutputBuffer; @@ -42,18 +43,18 @@ import org.apache.hadoop.security.UserGroupInformation; class EditLogBackupOutputStream extends EditLogOutputStream { static int DEFAULT_BUFFER_SIZE = 256; - private JournalProtocol backupNode; // RPC proxy to backup node - private NamenodeRegistration bnRegistration; // backup node registration - private NamenodeRegistration nnRegistration; // active node registration + private final JournalProtocol backupNode; // RPC proxy to backup node + private final NamenodeRegistration bnRegistration; // backup node registration + private final JournalInfo journalInfo; // active node registration + private final DataOutputBuffer out; // serialized output sent to backup node private EditsDoubleBuffer doubleBuf; - private DataOutputBuffer out; // serialized output sent to backup node EditLogBackupOutputStream(NamenodeRegistration bnReg, // backup node - NamenodeRegistration nnReg) // active name-node + JournalInfo journalInfo) // active name-node throws IOException { super(); this.bnRegistration = bnReg; - this.nnRegistration = nnReg; + this.journalInfo = journalInfo; InetSocketAddress bnAddress = NetUtils.createSocketAddr(bnRegistration.getAddress()); try { @@ -127,8 +128,7 @@ class EditLogBackupOutputStream extends EditLogOutputStream { out.reset(); assert out.getLength() == 0 : "Output buffer is not empty"; - backupNode.journal(nnRegistration, - firstTxToFlush, numReadyTxns, data); + backupNode.journal(journalInfo, 0, firstTxToFlush, numReadyTxns, data); } } @@ -140,6 +140,6 @@ class EditLogBackupOutputStream extends EditLogOutputStream { } void startLogSegment(long txId) throws IOException { - backupNode.startLogSegment(nnRegistration, txId); + backupNode.startLogSegment(journalInfo, 0, txId); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java new file mode 100644 index 00000000000..5bbd76dd882 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FenceResponse.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Response to a journal fence request. See {@link JournalProtocol#fence} + */ +@InterfaceAudience.Private +public class FenceResponse { + private final long previousEpoch; + private final long lastTransactionId; + private final boolean isInSync; + + public FenceResponse(long previousEpoch, long lastTransId, boolean inSync) { + this.previousEpoch = previousEpoch; + this.lastTransactionId = lastTransId; + this.isInSync = inSync; + } + + public boolean isInSync() { + return isInSync; + } + + public long getLastTransactionId() { + return lastTransactionId; + } + + public long getPreviousEpoch() { + return previousEpoch; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java new file mode 100644 index 00000000000..2f9f54bd7e3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/FencedException.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import java.io.IOException; + +/** + * If a previous user of a resource tries to use a shared resource, after + * fenced by another user, this exception is thrown. + */ +public class FencedException extends IOException { + private static final long serialVersionUID = 1L; + + public FencedException(String errorMsg) { + super(errorMsg); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java new file mode 100644 index 00000000000..530934d2372 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalInfo.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; + +/** + * Information that describes a journal + */ +@InterfaceAudience.Private +public class JournalInfo { + private final int layoutVersion; + private final String clusterId; + private final int namespaceId; + + public JournalInfo(int lv, String clusterId, int nsId) { + this.layoutVersion = lv; + this.clusterId = clusterId; + this.namespaceId = nsId; + } + + public int getLayoutVersion() { + return layoutVersion; + } + + public String getClusterId() { + return clusterId; + } + + public int getNamespaceId() { + return namespaceId; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java index b9d55151f88..be514b96efe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/JournalProtocol.java @@ -21,7 +21,6 @@ import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.hdfs.DFSConfigKeys; -import org.apache.hadoop.ipc.VersionedProtocol; import org.apache.hadoop.security.KerberosInfo; /** @@ -53,12 +52,15 @@ public interface JournalProtocol { * via {@code EditLogBackupOutputStream} in order to synchronize meta-data * changes with the backup namespace image. * - * @param registration active node registration + * @param journalInfo journal information + * @param epoch marks beginning a new journal writer * @param firstTxnId the first transaction of this batch * @param numTxns number of transactions * @param records byte array containing serialized journal records + * @throws FencedException if the resource has been fenced */ - public void journal(NamenodeRegistration registration, + public void journal(JournalInfo journalInfo, + long epoch, long firstTxnId, int numTxns, byte[] records) throws IOException; @@ -66,9 +68,24 @@ public interface JournalProtocol { /** * Notify the BackupNode that the NameNode has rolled its edit logs * and is now writing a new log segment. - * @param registration the registration of the active NameNode + * @param journalInfo journal information + * @param epoch marks beginning a new journal writer * @param txid the first txid in the new log + * @throws FencedException if the resource has been fenced */ - public void startLogSegment(NamenodeRegistration registration, + public void startLogSegment(JournalInfo journalInfo, long epoch, long txid) throws IOException; + + /** + * Request to fence any other journal writers. + * Older writers with at previous epoch will be fenced and can no longer + * perform journal operations. + * + * @param journalInfo journal information + * @param epoch marks beginning a new journal writer + * @param fencerInfo info about fencer for debugging purposes + * @throws FencedException if the resource has been fenced + */ + public FenceResponse fence(JournalInfo journalInfo, long epoch, + String fencerInfo) throws IOException; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto index c15347190e8..1e720bab059 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/JournalProtocol.proto @@ -36,16 +36,18 @@ message JournalInfoProto { } /** - * JournalInfo - the information about the journal + * journalInfo - the information about the journal * firstTxnId - the first txid in the journal records * numTxns - Number of transactions in editlog * records - bytes containing serialized journal records + * epoch - change to this represents change of journal writer */ message JournalRequestProto { required JournalInfoProto journalInfo = 1; required uint64 firstTxnId = 2; required uint32 numTxns = 3; required bytes records = 4; + required uint64 epoch = 5; } /** @@ -55,12 +57,13 @@ message JournalResponseProto { } /** - * JournalInfo - the information about the journal + * journalInfo - the information about the journal * txid - first txid in the new log */ message StartLogSegmentRequestProto { - required JournalInfoProto journalInfo = 1; - required uint64 txid = 2; + required JournalInfoProto journalInfo = 1; // Info about the journal + required uint64 txid = 2; // Transaction ID + required uint64 epoch = 3; } /** @@ -69,6 +72,27 @@ message StartLogSegmentRequestProto { message StartLogSegmentResponseProto { } +/** + * journalInfo - the information about the journal + * txid - first txid in the new log + */ +message FenceRequestProto { + required JournalInfoProto journalInfo = 1; // Info about the journal + required uint64 epoch = 2; // Epoch - change indicates change in writer + optional string fencerInfo = 3; // Info about fencer for debugging +} + +/** + * previousEpoch - previous epoch if any or zero + * lastTransactionId - last valid transaction Id in the journal + * inSync - if all journal segments are available and in sync + */ +message FenceResponseProto { + optional uint64 previousEpoch = 1; + optional uint64 lastTransactionId = 2; + optional bool inSync = 3; +} + /** * Protocol used to journal edits to a remote node. Currently, * this is used to publish edits from the NameNode to a BackupNode. @@ -89,4 +113,10 @@ service JournalProtocolService { */ rpc startLogSegment(StartLogSegmentRequestProto) returns (StartLogSegmentResponseProto); + + /** + * Request to fence a journal receiver. + */ + rpc fence(FenceRequestProto) + returns (FenceResponseProto); }