diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt index ca65f2b5748..da8b9d6e1b7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.HDFS-3077.txt @@ -14,3 +14,5 @@ HDFS-3725. Fix QJM startup when individual JNs have gaps (todd) HDFS-3741. Exhaustive failure injection test for skipped RPCs (todd) HDFS-3773. TestNNWithQJM fails after HDFS-3741. (atm) + +HDFS-3793. Implement genericized format() in QJM (todd) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java index 0544f302463..2c9d5c05b88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLogger.java @@ -82,6 +82,12 @@ interface AsyncLogger { */ public ListenableFuture purgeLogsOlderThan(long minTxIdToKeep); + /** + * Format the log directory. + * @param nsInfo the namespace info to format with + */ + public ListenableFuture format(NamespaceInfo nsInfo); + /** * @return the state of the last epoch on the target node. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java index a89200700cc..d0ca5ab096e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/AsyncLoggerSet.java @@ -25,18 +25,23 @@ import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.ipc.RemoteException; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableList; import com.google.common.collect.Maps; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; /** * Wrapper around a set of Loggers, taking care of fanning out @@ -197,6 +202,36 @@ class AsyncLoggerSet { } return QuorumCall.create(calls); } + + public QuorumCall isFormatted() { + Map> calls = Maps.newHashMap(); + for (AsyncLogger logger : loggers) { + final SettableFuture ret = SettableFuture.create(); + ListenableFuture jstate = + logger.getJournalState(); + Futures.addCallback(jstate, new FutureCallback() { + @Override + public void onFailure(Throwable t) { + if (t instanceof RemoteException) { + t = ((RemoteException)t).unwrapRemoteException(); + } + if (t instanceof JournalNotFormattedException) { + ret.set(false); + } else { + ret.setException(t); + } + } + + @Override + public void onSuccess(GetJournalStateResponseProto jstate) { + ret.set(true); + } + }); + + calls.put(logger, ret); + } + return QuorumCall.create(calls); + } private QuorumCall newEpoch( NamespaceInfo nsInfo, @@ -275,4 +310,15 @@ class AsyncLoggerSet { } return QuorumCall.create(calls); } + + QuorumCall format(NamespaceInfo nsInfo) { + Map> calls = + Maps.newHashMap(); + for (AsyncLogger logger : loggers) { + ListenableFuture future = + logger.format(nsInfo); + calls.put(logger, future); + } + return QuorumCall.create(calls); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java index 1ff945c38ea..f17ad5e5ef2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/IPCLoggerChannel.java @@ -280,6 +280,17 @@ public class IPCLoggerChannel implements AsyncLogger { queuedEditsSizeBytes -= size; } + @Override + public ListenableFuture format(final NamespaceInfo nsInfo) { + return executor.submit(new Callable() { + @Override + public Void call() throws Exception { + getProxy().format(journalId, nsInfo); + return null; + } + }); + } + @Override public ListenableFuture startLogSegment(final long txid) { return executor.submit(new Callable() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java index 7846e59d6dd..1692771d452 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/client/QuorumJournalManager.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.Map.Entry; import java.util.PriorityQueue; +import java.util.concurrent.TimeoutException; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -68,6 +69,12 @@ public class QuorumJournalManager implements JournalManager { private final int acceptRecoveryTimeoutMs; private final int finalizeSegmentTimeoutMs; private final int selectInputStreamsTimeoutMs; + + // Since these don't occur during normal operation, we can + // use rather lengthy timeouts, and don't need to make them + // configurable. + private static final int FORMAT_TIMEOUT_MS = 60000; + private static final int HASDATA_TIMEOUT_MS = 60000; private final Configuration conf; private final URI uri; @@ -133,6 +140,52 @@ public class QuorumJournalManager implements JournalManager { "bad journal id: " + jid); } + @Override + public void format(NamespaceInfo nsInfo) throws IOException { + QuorumCall call = loggers.format(nsInfo); + try { + call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS); + } catch (InterruptedException e) { + throw new IOException("Interrupted waiting for format() response"); + } catch (TimeoutException e) { + throw new IOException("Timed out waiting for format() response"); + } + + if (call.countExceptions() > 0) { + call.rethrowException("Could not format one or more JournalNodes"); + } + } + + @Override + public boolean hasSomeData() throws IOException { + QuorumCall call = + loggers.isFormatted(); + + try { + call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS); + } catch (InterruptedException e) { + throw new IOException("Interrupted while determining if JNs have data"); + } catch (TimeoutException e) { + throw new IOException("Timed out waiting for response from loggers"); + } + + if (call.countExceptions() > 0) { + call.rethrowException( + "Unable to check if JNs are ready for formatting"); + } + + // If any of the loggers returned with a non-empty manifest, then + // we should prompt for format. + for (Boolean hasData : call.getResults().values()) { + if (hasData) { + return true; + } + } + + // Otherwise, none were formatted, we can safely format. + return false; + } + /** * Run recovery/synchronization for a specific segment. * Postconditions: @@ -278,7 +331,7 @@ public class QuorumJournalManager implements JournalManager { } return addrs; } - + @Override public EditLogOutputStream startLogSegment(long txId) throws IOException { Preconditions.checkState(isActiveWriter, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java new file mode 100644 index 00000000000..874d6481f8e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/JournalNotFormattedException.java @@ -0,0 +1,35 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.qjournal.protocol; + +import org.apache.hadoop.classification.InterfaceAudience; +import java.io.IOException; + +/** + * Exception indicating that a call has been made to a JournalNode + * which is not yet formatted. + */ +@InterfaceAudience.Private +public class JournalNotFormattedException extends IOException { + private static final long serialVersionUID = 1L; + + public JournalNotFormattedException(String msg) { + super(msg); + } + +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java index c9136429e0a..ccea160e1e4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocol/QJournalProtocol.java @@ -54,6 +54,12 @@ public interface QJournalProtocol { public GetJournalStateResponseProto getJournalState(String journalId) throws IOException; + /** + * Format the underlying storage for the given namespace. + */ + public void format(String journalId, + NamespaceInfo nsInfo) throws IOException; + /** * Begin a new epoch. See the HDFS-3077 design doc for details. */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java index e6a68ad37a8..854cfd906bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolServerSideTranslatorPB.java @@ -23,6 +23,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentResponseProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto; @@ -90,6 +92,17 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP } } + public FormatResponseProto format(RpcController controller, + FormatRequestProto request) throws ServiceException { + try { + impl.format(request.getJid().getIdentifier(), + PBHelper.convert(request.getNsInfo())); + return FormatResponseProto.getDefaultInstance(); + } catch (IOException ioe) { + throw new ServiceException(ioe); + } + } + /** @see JournalProtocol#journal */ @Override public JournalResponseProto journal(RpcController unused, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java index 0805773994e..eab835eacf2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/protocolPB/QJournalProtocolTranslatorPB.java @@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto; +import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto; @@ -93,6 +94,19 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface, .setIdentifier(jid) .build(); } + + @Override + public void format(String jid, NamespaceInfo nsInfo) throws IOException { + try { + FormatRequestProto req = FormatRequestProto.newBuilder() + .setJid(convertJournalId(jid)) + .setNsInfo(PBHelper.convert(nsInfo)) + .build(); + rpcProxy.format(NULL_CONTROLLER, req); + } catch (ServiceException e) { + throw ProtobufHelper.getRemoteException(e); + } + } @Override public NewEpochResponseProto newEpoch(String jid, NamespaceInfo nsInfo, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java index 48bb09d4b57..5207228aa65 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JNStorage.java @@ -105,37 +105,28 @@ class JNStorage extends Storage { setStorageInfo(nsInfo); LOG.info("Formatting journal storage directory " + sd + " with nsid: " + getNamespaceID()); + // Unlock the directory before formatting, because we will + // re-analyze it after format(). The analyzeStorage() call + // below is reponsible for re-locking it. This is a no-op + // if the storage is not currently locked. + unlockAll(); sd.clearDirectory(); writeProperties(sd); if (!getPaxosDir().mkdirs()) { throw new IOException("Could not create paxos dir: " + getPaxosDir()); } - } - - public void formatIfNecessary(NamespaceInfo nsInfo) throws IOException { - if (state == StorageState.NOT_FORMATTED || - state == StorageState.NON_EXISTENT) { - format(nsInfo); - analyzeStorage(); - assert state == StorageState.NORMAL : - "Unexpected state after formatting: " + state; - } else { - Preconditions.checkState(state == StorageState.NORMAL, - "Unhandled storage state in %s: %s", this, state); - assert getNamespaceID() != 0; - - checkConsistentNamespace(nsInfo); - } + analyzeStorage(); } - private void analyzeStorage() throws IOException { + + void analyzeStorage() throws IOException { this.state = sd.analyzeStorage(StartupOption.REGULAR, this); if (state == StorageState.NORMAL) { readProperties(sd); } } - private void checkConsistentNamespace(NamespaceInfo nsInfo) + void checkConsistentNamespace(NamespaceInfo nsInfo) throws IOException { if (nsInfo.getNamespaceID() != getNamespaceID()) { throw new IOException("Incompatible namespaceID for journal " + @@ -155,4 +146,8 @@ class JNStorage extends Storage { LOG.info("Closing journal storage for " + sd); unlockAll(); } + + public boolean isFormatted() { + return state == StorageState.NORMAL; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java index 897f591dd2a..3be95455e79 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/Journal.java @@ -30,6 +30,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.protocol.HdfsConstants; +import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto; import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData; @@ -114,6 +115,8 @@ class Journal implements Closeable { Preconditions.checkState(nsInfo.getNamespaceID() != 0, "can't format with uninitialized namespace info: %s", nsInfo); + LOG.info("Formatting " + this + " with namespace info: " + + nsInfo); storage.format(nsInfo); } @@ -134,6 +137,7 @@ class Journal implements Closeable { * any lower epoch, or 0 if no promises have been made. */ synchronized long getLastPromisedEpoch() throws IOException { + checkFormatted(); return lastPromisedEpoch.get(); } @@ -150,9 +154,8 @@ class Journal implements Closeable { synchronized NewEpochResponseProto newEpoch( NamespaceInfo nsInfo, long epoch) throws IOException { - // If the storage is unformatted, format it with this NS. - // Otherwise, check that the NN's nsinfo matches the storage. - storage.formatIfNecessary(nsInfo); + checkFormatted(); + storage.checkConsistentNamespace(nsInfo); if (epoch <= getLastPromisedEpoch()) { throw new IOException("Proposed epoch " + epoch + " <= last promise " + @@ -185,6 +188,7 @@ class Journal implements Closeable { synchronized void journal(RequestInfo reqInfo, long firstTxnId, int numTxns, byte[] records) throws IOException { checkRequest(reqInfo); + checkFormatted(); // TODO: if a JN goes down and comes back up, then it will throw // this exception on every edit. We should instead send back @@ -226,6 +230,13 @@ class Journal implements Closeable { // TODO: some check on serial number that they only increase from a given // client } + + private void checkFormatted() throws JournalNotFormattedException { + if (!storage.isFormatted()) { + throw new JournalNotFormattedException("Journal " + storage + + " not formatted"); + } + } /** * Start a new segment at the given txid. The previous segment @@ -235,6 +246,7 @@ class Journal implements Closeable { throws IOException { assert fjm != null; checkRequest(reqInfo); + checkFormatted(); Preconditions.checkState(curSegment == null, "Can't start a log segment, already writing " + curSegment); @@ -251,6 +263,7 @@ class Journal implements Closeable { public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId, long endTxId) throws IOException { checkRequest(reqInfo); + checkFormatted(); if (startTxId == curSegmentTxId) { if (curSegment != null) { @@ -284,6 +297,7 @@ class Journal implements Closeable { public synchronized void purgeLogsOlderThan(RequestInfo reqInfo, long minTxIdToKeep) throws IOException { checkRequest(reqInfo); + checkFormatted(); fjm.purgeLogsOlderThan(minTxIdToKeep); purgePaxosDecisionsOlderThan(minTxIdToKeep); @@ -320,6 +334,8 @@ class Journal implements Closeable { throws IOException { // No need to checkRequest() here - anyone may ask for the list // of segments. + checkFormatted(); + RemoteEditLogManifest manifest = new RemoteEditLogManifest( fjm.getRemoteEditLogs(sinceTxId)); return manifest; @@ -360,6 +376,7 @@ class Journal implements Closeable { public synchronized PrepareRecoveryResponseProto prepareRecovery( RequestInfo reqInfo, long segmentTxId) throws IOException { checkRequest(reqInfo); + checkFormatted(); PrepareRecoveryResponseProto.Builder builder = PrepareRecoveryResponseProto.newBuilder(); @@ -388,6 +405,7 @@ class Journal implements Closeable { SegmentStateProto segment, URL fromUrl) throws IOException { checkRequest(reqInfo); + checkFormatted(); long segmentTxId = segment.getStartTxId(); // TODO: right now, a recovery of a segment when the log is diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java index 114fe277dda..f03ceadb7aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/qjournal/server/JournalNodeRpcServer.java @@ -108,6 +108,11 @@ class JournalNodeRpcServer implements QJournalProtocol { return jn.getOrCreateJournal(journalId).newEpoch(nsInfo, epoch); } + @Override + public void format(String journalId, NamespaceInfo nsInfo) + throws IOException { + jn.getOrCreateJournal(journalId).format(nsInfo); + } @Override public void journal(RequestInfo reqInfo, long firstTxnId, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto index 38da49150a0..c6a28e82f39 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/QJournalProtocol.proto @@ -109,6 +109,17 @@ message GetJournalStateResponseProto { required uint32 httpPort = 2; } +/** + * format() + */ +message FormatRequestProto { + required JournalIdProto jid = 1; + required NamespaceInfoProto nsInfo = 2; +} + +message FormatResponseProto { +} + /** * newEpoch() */ @@ -178,6 +189,8 @@ service QJournalProtocolService { rpc newEpoch(NewEpochRequestProto) returns (NewEpochResponseProto); + rpc format(FormatRequestProto) returns (FormatResponseProto); + rpc journal(JournalRequestProto) returns (JournalResponseProto); rpc startLogSegment(StartLogSegmentRequestProto) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java index 80c1db277a5..82f8dc17286 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/TestNNWithQJM.java @@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.ExitUtil; @@ -157,19 +158,26 @@ public class TestNNWithQJM { conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY, mjc.getQuorumJournalURI("myjournal").toString()); - // Start a NN, so the storage is formatted with its namespace info. + // Start a NN, so the storage is formatted -- both on-disk + // and QJM. MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(0) .manageNameDfsDirs(false) .build(); cluster.shutdown(); - // Create a new (freshly-formatted) NN, which should not be able to - // reuse the same journal, since its journal ID would not match. + // Reformat just the on-disk portion + Configuration onDiskOnly = new Configuration(conf); + onDiskOnly.unset(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY); + NameNode.format(onDiskOnly); + + // Start the NN - should fail because the JNs are still formatted + // with the old namespace ID. try { cluster = new MiniDFSCluster.Builder(conf) .numDataNodes(0) .manageNameDfsDirs(false) + .format(false) .build(); fail("New NN with different namespace should have been rejected"); } catch (IOException ioe) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java index 4cad9bef142..c18599e9ea4 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestEpochsAreUnique.java @@ -54,6 +54,10 @@ public class TestEpochsAreUnique { Configuration conf = new Configuration(); MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build(); URI uri = cluster.getQuorumJournalURI(JID); + QuorumJournalManager qjm = new QuorumJournalManager( + conf, uri, FAKE_NSINFO); + qjm.format(FAKE_NSINFO); + try { // With no failures or contention, epochs should increase one-by-one for (int i = 0; i < 5; i++) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java index 08020c6025e..3bbea007767 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQJMWithFaults.java @@ -76,6 +76,7 @@ public class TestQJMWithFaults { MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build(); try { QuorumJournalManager qjm = createInjectableQJM(cluster); + qjm.format(FAKE_NSINFO); doWorkload(cluster, qjm); SortedSet ipcCounts = Sets.newTreeSet(); @@ -118,6 +119,7 @@ public class TestQJMWithFaults { try { QuorumJournalManager qjm; qjm = createInjectableQJM(cluster); + qjm.format(FAKE_NSINFO); List loggers = qjm.getLoggerSetForTests().getLoggersForTests(); failIpcNumber(loggers.get(0), failA); failIpcNumber(loggers.get(1), failB); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java index 96b04198d30..891bd42bfe2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManager.java @@ -84,6 +84,7 @@ public class TestQuorumJournalManager { qjm = createSpyingQJM(); spies = qjm.getLoggerSetForTests().getLoggersForTests(); + qjm.format(QJMTestUtil.FAKE_NSINFO); qjm.recoverUnfinalizedSegments(); assertEquals(1, qjm.getLoggerSetForTests().getEpoch()); } @@ -109,6 +110,15 @@ public class TestQuorumJournalManager { checkRecovery(cluster, 4, 4); } + @Test + public void testFormat() throws Exception { + QuorumJournalManager qjm = new QuorumJournalManager( + conf, cluster.getQuorumJournalURI("testFormat-jid"), FAKE_NSINFO); + assertFalse(qjm.hasSomeData()); + qjm.format(FAKE_NSINFO); + assertTrue(qjm.hasSomeData()); + } + @Test public void testReaderWhileAnotherWrites() throws Exception { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java index b25097b6310..13e4d645d47 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/client/TestQuorumJournalManagerUnit.java @@ -85,6 +85,8 @@ public class TestQuorumJournalManagerUnit { futureReturns( NewEpochResponseProto.newBuilder().build() ).when(logger).newEpoch(Mockito.anyLong()); + + futureReturns(null).when(logger).format(Mockito.any()); } qjm.recoverUnfinalizedSegments(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java index e193966b7a0..9bfdf2e5ebf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/qjournal/server/TestJournal.java @@ -61,6 +61,7 @@ public class TestJournal { public void setup() throws Exception { FileUtil.fullyDelete(TEST_LOG_DIR); journal = new Journal(TEST_LOG_DIR, mockErrorReporter); + journal.format(FAKE_NSINFO); } @After @@ -130,18 +131,14 @@ public class TestJournal { @Test public void testJournalLocking() throws Exception { + Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported()); StorageDirectory sd = journal.getStorage().getStorageDir(0); File lockFile = new File(sd.getRoot(), Storage.STORAGE_FILE_LOCK); - - // Journal should not be locked, since we lazily initialize it. - assertFalse(lockFile.exists()); + + // Journal should be locked, since the format() call locks it. + GenericTestUtils.assertExists(lockFile); journal.newEpoch(FAKE_NSINFO, 1); - Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported()); - - // Journal should be locked - GenericTestUtils.assertExists(lockFile); - try { new Journal(TEST_LOG_DIR, mockErrorReporter); fail("Did not fail to create another journal in same dir"); @@ -153,6 +150,7 @@ public class TestJournal { journal.close(); // Journal should no longer be locked after the close() call. + // Hence, should be able to create a new Journal in the same dir. Journal journal2 = new Journal(TEST_LOG_DIR, mockErrorReporter); journal2.newEpoch(FAKE_NSINFO, 2); }