HDFS-3793. Implement genericized format() in QJM. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3077@1373177 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-08-15 00:48:11 +00:00
parent 4b67401565
commit f765fdb657
19 changed files with 274 additions and 33 deletions

View File

@ -14,3 +14,5 @@ HDFS-3725. Fix QJM startup when individual JNs have gaps (todd)
HDFS-3741. Exhaustive failure injection test for skipped RPCs (todd)
HDFS-3773. TestNNWithQJM fails after HDFS-3741. (atm)
HDFS-3793. Implement genericized format() in QJM (todd)

View File

@ -82,6 +82,12 @@ interface AsyncLogger {
*/
public ListenableFuture<Void> purgeLogsOlderThan(long minTxIdToKeep);
/**
* Format the log directory.
* @param nsInfo the namespace info to format with
*/
public ListenableFuture<Void> format(NamespaceInfo nsInfo);
/**
* @return the state of the last epoch on the target node.
*/

View File

@ -25,18 +25,23 @@ import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PrepareRecoveryResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.SegmentStateProto;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.ipc.RemoteException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Maps;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import com.google.common.util.concurrent.SettableFuture;
/**
* Wrapper around a set of Loggers, taking care of fanning out
@ -197,6 +202,36 @@ class AsyncLoggerSet {
}
return QuorumCall.create(calls);
}
public QuorumCall<AsyncLogger, Boolean> isFormatted() {
Map<AsyncLogger, ListenableFuture<Boolean>> calls = Maps.newHashMap();
for (AsyncLogger logger : loggers) {
final SettableFuture<Boolean> ret = SettableFuture.create();
ListenableFuture<GetJournalStateResponseProto> jstate =
logger.getJournalState();
Futures.addCallback(jstate, new FutureCallback<GetJournalStateResponseProto>() {
@Override
public void onFailure(Throwable t) {
if (t instanceof RemoteException) {
t = ((RemoteException)t).unwrapRemoteException();
}
if (t instanceof JournalNotFormattedException) {
ret.set(false);
} else {
ret.setException(t);
}
}
@Override
public void onSuccess(GetJournalStateResponseProto jstate) {
ret.set(true);
}
});
calls.put(logger, ret);
}
return QuorumCall.create(calls);
}
private QuorumCall<AsyncLogger,NewEpochResponseProto> newEpoch(
NamespaceInfo nsInfo,
@ -275,4 +310,15 @@ class AsyncLoggerSet {
}
return QuorumCall.create(calls);
}
QuorumCall<AsyncLogger,Void> format(NamespaceInfo nsInfo) {
Map<AsyncLogger, ListenableFuture<Void>> calls =
Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<Void> future =
logger.format(nsInfo);
calls.put(logger, future);
}
return QuorumCall.create(calls);
}
}

View File

@ -280,6 +280,17 @@ public class IPCLoggerChannel implements AsyncLogger {
queuedEditsSizeBytes -= size;
}
@Override
public ListenableFuture<Void> format(final NamespaceInfo nsInfo) {
return executor.submit(new Callable<Void>() {
@Override
public Void call() throws Exception {
getProxy().format(journalId, nsInfo);
return null;
}
});
}
@Override
public ListenableFuture<Void> startLogSegment(final long txid) {
return executor.submit(new Callable<Void>() {

View File

@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.PriorityQueue;
import java.util.concurrent.TimeoutException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -68,6 +69,12 @@ public class QuorumJournalManager implements JournalManager {
private final int acceptRecoveryTimeoutMs;
private final int finalizeSegmentTimeoutMs;
private final int selectInputStreamsTimeoutMs;
// Since these don't occur during normal operation, we can
// use rather lengthy timeouts, and don't need to make them
// configurable.
private static final int FORMAT_TIMEOUT_MS = 60000;
private static final int HASDATA_TIMEOUT_MS = 60000;
private final Configuration conf;
private final URI uri;
@ -133,6 +140,52 @@ public class QuorumJournalManager implements JournalManager {
"bad journal id: " + jid);
}
@Override
public void format(NamespaceInfo nsInfo) throws IOException {
QuorumCall<AsyncLogger,Void> call = loggers.format(nsInfo);
try {
call.waitFor(loggers.size(), loggers.size(), 0, FORMAT_TIMEOUT_MS);
} catch (InterruptedException e) {
throw new IOException("Interrupted waiting for format() response");
} catch (TimeoutException e) {
throw new IOException("Timed out waiting for format() response");
}
if (call.countExceptions() > 0) {
call.rethrowException("Could not format one or more JournalNodes");
}
}
@Override
public boolean hasSomeData() throws IOException {
QuorumCall<AsyncLogger, Boolean> call =
loggers.isFormatted();
try {
call.waitFor(loggers.size(), 0, 0, HASDATA_TIMEOUT_MS);
} catch (InterruptedException e) {
throw new IOException("Interrupted while determining if JNs have data");
} catch (TimeoutException e) {
throw new IOException("Timed out waiting for response from loggers");
}
if (call.countExceptions() > 0) {
call.rethrowException(
"Unable to check if JNs are ready for formatting");
}
// If any of the loggers returned with a non-empty manifest, then
// we should prompt for format.
for (Boolean hasData : call.getResults().values()) {
if (hasData) {
return true;
}
}
// Otherwise, none were formatted, we can safely format.
return false;
}
/**
* Run recovery/synchronization for a specific segment.
* Postconditions:
@ -278,7 +331,7 @@ public class QuorumJournalManager implements JournalManager {
}
return addrs;
}
@Override
public EditLogOutputStream startLogSegment(long txId) throws IOException {
Preconditions.checkState(isActiveWriter,

View File

@ -0,0 +1,35 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.qjournal.protocol;
import org.apache.hadoop.classification.InterfaceAudience;
import java.io.IOException;
/**
* Exception indicating that a call has been made to a JournalNode
* which is not yet formatted.
*/
@InterfaceAudience.Private
public class JournalNotFormattedException extends IOException {
private static final long serialVersionUID = 1L;
public JournalNotFormattedException(String msg) {
super(msg);
}
}

View File

@ -54,6 +54,12 @@ public interface QJournalProtocol {
public GetJournalStateResponseProto getJournalState(String journalId)
throws IOException;
/**
* Format the underlying storage for the given namespace.
*/
public void format(String journalId,
NamespaceInfo nsInfo) throws IOException;
/**
* Begin a new epoch. See the HDFS-3077 design doc for details.
*/

View File

@ -23,6 +23,8 @@ import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
@ -90,6 +92,17 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
}
}
public FormatResponseProto format(RpcController controller,
FormatRequestProto request) throws ServiceException {
try {
impl.format(request.getJid().getIdentifier(),
PBHelper.convert(request.getNsInfo()));
return FormatResponseProto.getDefaultInstance();
} catch (IOException ioe) {
throw new ServiceException(ioe);
}
}
/** @see JournalProtocol#journal */
@Override
public JournalResponseProto journal(RpcController unused,

View File

@ -27,6 +27,7 @@ import org.apache.hadoop.hdfs.protocolPB.PBHelper;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FinalizeLogSegmentRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.FormatRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestRequestProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetEditLogManifestResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.GetJournalStateRequestProto;
@ -93,6 +94,19 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
.setIdentifier(jid)
.build();
}
@Override
public void format(String jid, NamespaceInfo nsInfo) throws IOException {
try {
FormatRequestProto req = FormatRequestProto.newBuilder()
.setJid(convertJournalId(jid))
.setNsInfo(PBHelper.convert(nsInfo))
.build();
rpcProxy.format(NULL_CONTROLLER, req);
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
@Override
public NewEpochResponseProto newEpoch(String jid, NamespaceInfo nsInfo,

View File

@ -105,37 +105,28 @@ class JNStorage extends Storage {
setStorageInfo(nsInfo);
LOG.info("Formatting journal storage directory " +
sd + " with nsid: " + getNamespaceID());
// Unlock the directory before formatting, because we will
// re-analyze it after format(). The analyzeStorage() call
// below is reponsible for re-locking it. This is a no-op
// if the storage is not currently locked.
unlockAll();
sd.clearDirectory();
writeProperties(sd);
if (!getPaxosDir().mkdirs()) {
throw new IOException("Could not create paxos dir: " + getPaxosDir());
}
}
public void formatIfNecessary(NamespaceInfo nsInfo) throws IOException {
if (state == StorageState.NOT_FORMATTED ||
state == StorageState.NON_EXISTENT) {
format(nsInfo);
analyzeStorage();
assert state == StorageState.NORMAL :
"Unexpected state after formatting: " + state;
} else {
Preconditions.checkState(state == StorageState.NORMAL,
"Unhandled storage state in %s: %s", this, state);
assert getNamespaceID() != 0;
checkConsistentNamespace(nsInfo);
}
analyzeStorage();
}
private void analyzeStorage() throws IOException {
void analyzeStorage() throws IOException {
this.state = sd.analyzeStorage(StartupOption.REGULAR, this);
if (state == StorageState.NORMAL) {
readProperties(sd);
}
}
private void checkConsistentNamespace(NamespaceInfo nsInfo)
void checkConsistentNamespace(NamespaceInfo nsInfo)
throws IOException {
if (nsInfo.getNamespaceID() != getNamespaceID()) {
throw new IOException("Incompatible namespaceID for journal " +
@ -155,4 +146,8 @@ class JNStorage extends Storage {
LOG.info("Closing journal storage for " + sd);
unlockAll();
}
public boolean isFormatted() {
return state == StorageState.NORMAL;
}
}

View File

@ -30,6 +30,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.qjournal.protocol.JournalNotFormattedException;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocol;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.NewEpochResponseProto;
import org.apache.hadoop.hdfs.qjournal.protocol.QJournalProtocolProtos.PersistedRecoveryPaxosData;
@ -114,6 +115,8 @@ class Journal implements Closeable {
Preconditions.checkState(nsInfo.getNamespaceID() != 0,
"can't format with uninitialized namespace info: %s",
nsInfo);
LOG.info("Formatting " + this + " with namespace info: " +
nsInfo);
storage.format(nsInfo);
}
@ -134,6 +137,7 @@ class Journal implements Closeable {
* any lower epoch, or 0 if no promises have been made.
*/
synchronized long getLastPromisedEpoch() throws IOException {
checkFormatted();
return lastPromisedEpoch.get();
}
@ -150,9 +154,8 @@ class Journal implements Closeable {
synchronized NewEpochResponseProto newEpoch(
NamespaceInfo nsInfo, long epoch) throws IOException {
// If the storage is unformatted, format it with this NS.
// Otherwise, check that the NN's nsinfo matches the storage.
storage.formatIfNecessary(nsInfo);
checkFormatted();
storage.checkConsistentNamespace(nsInfo);
if (epoch <= getLastPromisedEpoch()) {
throw new IOException("Proposed epoch " + epoch + " <= last promise " +
@ -185,6 +188,7 @@ class Journal implements Closeable {
synchronized void journal(RequestInfo reqInfo, long firstTxnId,
int numTxns, byte[] records) throws IOException {
checkRequest(reqInfo);
checkFormatted();
// TODO: if a JN goes down and comes back up, then it will throw
// this exception on every edit. We should instead send back
@ -226,6 +230,13 @@ class Journal implements Closeable {
// TODO: some check on serial number that they only increase from a given
// client
}
private void checkFormatted() throws JournalNotFormattedException {
if (!storage.isFormatted()) {
throw new JournalNotFormattedException("Journal " + storage +
" not formatted");
}
}
/**
* Start a new segment at the given txid. The previous segment
@ -235,6 +246,7 @@ class Journal implements Closeable {
throws IOException {
assert fjm != null;
checkRequest(reqInfo);
checkFormatted();
Preconditions.checkState(curSegment == null,
"Can't start a log segment, already writing " + curSegment);
@ -251,6 +263,7 @@ class Journal implements Closeable {
public synchronized void finalizeLogSegment(RequestInfo reqInfo, long startTxId,
long endTxId) throws IOException {
checkRequest(reqInfo);
checkFormatted();
if (startTxId == curSegmentTxId) {
if (curSegment != null) {
@ -284,6 +297,7 @@ class Journal implements Closeable {
public synchronized void purgeLogsOlderThan(RequestInfo reqInfo,
long minTxIdToKeep) throws IOException {
checkRequest(reqInfo);
checkFormatted();
fjm.purgeLogsOlderThan(minTxIdToKeep);
purgePaxosDecisionsOlderThan(minTxIdToKeep);
@ -320,6 +334,8 @@ class Journal implements Closeable {
throws IOException {
// No need to checkRequest() here - anyone may ask for the list
// of segments.
checkFormatted();
RemoteEditLogManifest manifest = new RemoteEditLogManifest(
fjm.getRemoteEditLogs(sinceTxId));
return manifest;
@ -360,6 +376,7 @@ class Journal implements Closeable {
public synchronized PrepareRecoveryResponseProto prepareRecovery(
RequestInfo reqInfo, long segmentTxId) throws IOException {
checkRequest(reqInfo);
checkFormatted();
PrepareRecoveryResponseProto.Builder builder =
PrepareRecoveryResponseProto.newBuilder();
@ -388,6 +405,7 @@ class Journal implements Closeable {
SegmentStateProto segment, URL fromUrl)
throws IOException {
checkRequest(reqInfo);
checkFormatted();
long segmentTxId = segment.getStartTxId();
// TODO: right now, a recovery of a segment when the log is

View File

@ -108,6 +108,11 @@ class JournalNodeRpcServer implements QJournalProtocol {
return jn.getOrCreateJournal(journalId).newEpoch(nsInfo, epoch);
}
@Override
public void format(String journalId, NamespaceInfo nsInfo)
throws IOException {
jn.getOrCreateJournal(journalId).format(nsInfo);
}
@Override
public void journal(RequestInfo reqInfo, long firstTxnId,

View File

@ -109,6 +109,17 @@ message GetJournalStateResponseProto {
required uint32 httpPort = 2;
}
/**
* format()
*/
message FormatRequestProto {
required JournalIdProto jid = 1;
required NamespaceInfoProto nsInfo = 2;
}
message FormatResponseProto {
}
/**
* newEpoch()
*/
@ -178,6 +189,8 @@ service QJournalProtocolService {
rpc newEpoch(NewEpochRequestProto) returns (NewEpochResponseProto);
rpc format(FormatRequestProto) returns (FormatResponseProto);
rpc journal(JournalRequestProto) returns (JournalResponseProto);
rpc startLogSegment(StartLogSegmentRequestProto)

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.ExitUtil;
@ -157,19 +158,26 @@ public class TestNNWithQJM {
conf.set(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY,
mjc.getQuorumJournalURI("myjournal").toString());
// Start a NN, so the storage is formatted with its namespace info.
// Start a NN, so the storage is formatted -- both on-disk
// and QJM.
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.manageNameDfsDirs(false)
.build();
cluster.shutdown();
// Create a new (freshly-formatted) NN, which should not be able to
// reuse the same journal, since its journal ID would not match.
// Reformat just the on-disk portion
Configuration onDiskOnly = new Configuration(conf);
onDiskOnly.unset(DFSConfigKeys.DFS_NAMENODE_EDITS_DIR_KEY);
NameNode.format(onDiskOnly);
// Start the NN - should fail because the JNs are still formatted
// with the old namespace ID.
try {
cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(0)
.manageNameDfsDirs(false)
.format(false)
.build();
fail("New NN with different namespace should have been rejected");
} catch (IOException ioe) {

View File

@ -54,6 +54,10 @@ public class TestEpochsAreUnique {
Configuration conf = new Configuration();
MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
URI uri = cluster.getQuorumJournalURI(JID);
QuorumJournalManager qjm = new QuorumJournalManager(
conf, uri, FAKE_NSINFO);
qjm.format(FAKE_NSINFO);
try {
// With no failures or contention, epochs should increase one-by-one
for (int i = 0; i < 5; i++) {

View File

@ -76,6 +76,7 @@ public class TestQJMWithFaults {
MiniJournalCluster cluster = new MiniJournalCluster.Builder(conf).build();
try {
QuorumJournalManager qjm = createInjectableQJM(cluster);
qjm.format(FAKE_NSINFO);
doWorkload(cluster, qjm);
SortedSet<Integer> ipcCounts = Sets.newTreeSet();
@ -118,6 +119,7 @@ public class TestQJMWithFaults {
try {
QuorumJournalManager qjm;
qjm = createInjectableQJM(cluster);
qjm.format(FAKE_NSINFO);
List<AsyncLogger> loggers = qjm.getLoggerSetForTests().getLoggersForTests();
failIpcNumber(loggers.get(0), failA);
failIpcNumber(loggers.get(1), failB);

View File

@ -84,6 +84,7 @@ public class TestQuorumJournalManager {
qjm = createSpyingQJM();
spies = qjm.getLoggerSetForTests().getLoggersForTests();
qjm.format(QJMTestUtil.FAKE_NSINFO);
qjm.recoverUnfinalizedSegments();
assertEquals(1, qjm.getLoggerSetForTests().getEpoch());
}
@ -109,6 +110,15 @@ public class TestQuorumJournalManager {
checkRecovery(cluster, 4, 4);
}
@Test
public void testFormat() throws Exception {
QuorumJournalManager qjm = new QuorumJournalManager(
conf, cluster.getQuorumJournalURI("testFormat-jid"), FAKE_NSINFO);
assertFalse(qjm.hasSomeData());
qjm.format(FAKE_NSINFO);
assertTrue(qjm.hasSomeData());
}
@Test
public void testReaderWhileAnotherWrites() throws Exception {

View File

@ -85,6 +85,8 @@ public class TestQuorumJournalManagerUnit {
futureReturns(
NewEpochResponseProto.newBuilder().build()
).when(logger).newEpoch(Mockito.anyLong());
futureReturns(null).when(logger).format(Mockito.<NamespaceInfo>any());
}
qjm.recoverUnfinalizedSegments();

View File

@ -61,6 +61,7 @@ public class TestJournal {
public void setup() throws Exception {
FileUtil.fullyDelete(TEST_LOG_DIR);
journal = new Journal(TEST_LOG_DIR, mockErrorReporter);
journal.format(FAKE_NSINFO);
}
@After
@ -130,18 +131,14 @@ public class TestJournal {
@Test
public void testJournalLocking() throws Exception {
Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported());
StorageDirectory sd = journal.getStorage().getStorageDir(0);
File lockFile = new File(sd.getRoot(), Storage.STORAGE_FILE_LOCK);
// Journal should not be locked, since we lazily initialize it.
assertFalse(lockFile.exists());
// Journal should be locked, since the format() call locks it.
GenericTestUtils.assertExists(lockFile);
journal.newEpoch(FAKE_NSINFO, 1);
Assume.assumeTrue(journal.getStorage().getStorageDir(0).isLockSupported());
// Journal should be locked
GenericTestUtils.assertExists(lockFile);
try {
new Journal(TEST_LOG_DIR, mockErrorReporter);
fail("Did not fail to create another journal in same dir");
@ -153,6 +150,7 @@ public class TestJournal {
journal.close();
// Journal should no longer be locked after the close() call.
// Hence, should be able to create a new Journal in the same dir.
Journal journal2 = new Journal(TEST_LOG_DIR, mockErrorReporter);
journal2.newEpoch(FAKE_NSINFO, 2);
}