HDFS-4298. StorageRetentionManager spews warnings when used with QJM. Contributed by Aaron T. Myers.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1485371 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2013-05-22 19:37:41 +00:00
parent aa11e05a64
commit 8c62c46046
24 changed files with 97 additions and 44 deletions

View File

@ -95,8 +95,8 @@ public abstract class GenericTestUtils {
Set<String> expectedSet = Sets.newTreeSet(
Arrays.asList(expectedMatches));
Assert.assertEquals("Bad files matching " + pattern + " in " + dir,
Joiner.on(",").join(found),
Joiner.on(",").join(expectedSet));
Joiner.on(",").join(expectedSet),
Joiner.on(",").join(found));
}
public static void assertExceptionContains(String string, Throwable t) {

View File

@ -954,6 +954,8 @@ Release 2.0.5-beta - UNRELEASED
HDFS-4824. FileInputStreamCache.close leaves dangling reference to
FileInputStreamCache.cacheCleaner. (Colin Patrick McCabe via todd)
HDFS-4298. StorageRetentionManager spews warnings when used with QJM. (atm)
BREAKDOWN OF HDFS-347 SUBTASKS AND RELATED JIRAS
HDFS-4353. Encapsulate connections to peers in Peer and PeerServer classes.

View File

@ -500,9 +500,15 @@ public class BookKeeperJournalManager implements JournalManager {
}
}
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk) throws IOException {
selectInputStreams(streams, fromTxId, inProgressOk, true);
}
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk, boolean forReading)
throws IOException {
List<EditLogLedgerMetadata> currentLedgerList = getLedgerList(fromTxId,
inProgressOk);
try {

View File

@ -109,7 +109,7 @@ interface AsyncLogger {
* Fetch the list of edit logs available on the remote node.
*/
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
long fromTxnId);
long fromTxnId, boolean forReading);
/**
* Prepare recovery. See the HDFS-3077 design document for details.

View File

@ -263,13 +263,13 @@ class AsyncLoggerSet {
}
public QuorumCall<AsyncLogger, RemoteEditLogManifest>
getEditLogManifest(long fromTxnId) {
getEditLogManifest(long fromTxnId, boolean forReading) {
Map<AsyncLogger,
ListenableFuture<RemoteEditLogManifest>> calls
= Maps.newHashMap();
for (AsyncLogger logger : loggers) {
ListenableFuture<RemoteEditLogManifest> future =
logger.getEditLogManifest(fromTxnId);
logger.getEditLogManifest(fromTxnId, forReading);
calls.put(logger, future);
}
return QuorumCall.create(calls);

View File

@ -519,12 +519,12 @@ public class IPCLoggerChannel implements AsyncLogger {
@Override
public ListenableFuture<RemoteEditLogManifest> getEditLogManifest(
final long fromTxnId) {
final long fromTxnId, final boolean forReading) {
return executor.submit(new Callable<RemoteEditLogManifest>() {
@Override
public RemoteEditLogManifest call() throws IOException {
GetEditLogManifestResponseProto ret = getProxy().getEditLogManifest(
journalId, fromTxnId);
journalId, fromTxnId, forReading);
// Update the http port, since we need this to build URLs to any of the
// returned logs.
httpPort = ret.getHttpPort();

View File

@ -445,13 +445,18 @@ public class QuorumJournalManager implements JournalManager {
public void close() throws IOException {
loggers.close();
}
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk) throws IOException {
selectInputStreams(streams, fromTxnId, inProgressOk, true);
}
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk) throws IOException {
long fromTxnId, boolean inProgressOk, boolean forReading) throws IOException {
QuorumCall<AsyncLogger, RemoteEditLogManifest> q =
loggers.getEditLogManifest(fromTxnId);
loggers.getEditLogManifest(fromTxnId, forReading);
Map<AsyncLogger, RemoteEditLogManifest> resps =
loggers.waitForWriteQuorum(q, selectInputStreamsTimeoutMs,
"selectInputStreams");

View File

@ -123,10 +123,12 @@ public interface QJournalProtocol {
/**
* @param jid the journal from which to enumerate edits
* @param sinceTxId the first transaction which the client cares about
* @param forReading whether or not the caller intends to read from the edit
* logs
* @return a list of edit log segments since the given transaction ID.
*/
public GetEditLogManifestResponseProto getEditLogManifest(
String jid, long sinceTxId) throws IOException;
String jid, long sinceTxId, boolean forReading) throws IOException;
/**
* Begin the recovery process for a given segment. See the HDFS-3077

View File

@ -202,7 +202,8 @@ public class QJournalProtocolServerSideTranslatorPB implements QJournalProtocolP
try {
return impl.getEditLogManifest(
request.getJid().getIdentifier(),
request.getSinceTxId());
request.getSinceTxId(),
request.getForReading());
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -228,12 +228,13 @@ public class QJournalProtocolTranslatorPB implements ProtocolMetaInterface,
@Override
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
long sinceTxId) throws IOException {
long sinceTxId, boolean forReading) throws IOException {
try {
return rpcProxy.getEditLogManifest(NULL_CONTROLLER,
GetEditLogManifestRequestProto.newBuilder()
.setJid(convertJournalId(jid))
.setSinceTxId(sinceTxId)
.setForReading(forReading)
.build());
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);

View File

@ -627,14 +627,14 @@ class Journal implements Closeable {
/**
* @see QJournalProtocol#getEditLogManifest(String, long)
*/
public RemoteEditLogManifest getEditLogManifest(long sinceTxId)
throws IOException {
public RemoteEditLogManifest getEditLogManifest(long sinceTxId,
boolean forReading) throws IOException {
// No need to checkRequest() here - anyone may ask for the list
// of segments.
checkFormatted();
RemoteEditLogManifest manifest = new RemoteEditLogManifest(
fjm.getRemoteEditLogs(sinceTxId));
fjm.getRemoteEditLogs(sinceTxId, forReading));
return manifest;
}

View File

@ -175,10 +175,10 @@ class JournalNodeRpcServer implements QJournalProtocol {
@Override
public GetEditLogManifestResponseProto getEditLogManifest(String jid,
long sinceTxId) throws IOException {
long sinceTxId, boolean forReading) throws IOException {
RemoteEditLogManifest manifest = jn.getOrCreateJournal(jid)
.getEditLogManifest(sinceTxId);
.getEditLogManifest(sinceTxId, forReading);
return GetEditLogManifestResponseProto.newBuilder()
.setManifest(PBHelper.convert(manifest))

View File

@ -77,7 +77,7 @@ class BackupJournalManager implements JournalManager {
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk) {
long fromTxnId, boolean inProgressOk, boolean forReading) {
// This JournalManager is never used for input. Therefore it cannot
// return any transactions
}

View File

@ -277,7 +277,7 @@ public class FSEditLog implements LogsPurgeable {
// Safety check: we should never start a segment if there are
// newer txids readable.
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
journalSet.selectInputStreams(streams, segmentTxId, true);
journalSet.selectInputStreams(streams, segmentTxId, true, true);
if (!streams.isEmpty()) {
String error = String.format("Cannot start writing at txid %s " +
"when there is a stream available for read: %s",
@ -939,7 +939,7 @@ public class FSEditLog implements LogsPurgeable {
*/
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId)
throws IOException {
return journalSet.getEditLogManifest(fromTxId);
return journalSet.getEditLogManifest(fromTxId, true);
}
/**
@ -1233,8 +1233,8 @@ public class FSEditLog implements LogsPurgeable {
}
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk) {
journalSet.selectInputStreams(streams, fromTxId, inProgressOk);
long fromTxId, boolean inProgressOk, boolean forReading) {
journalSet.selectInputStreams(streams, fromTxId, inProgressOk, forReading);
}
public Collection<EditLogInputStream> selectInputStreams(
@ -1253,7 +1253,7 @@ public class FSEditLog implements LogsPurgeable {
long fromTxId, long toAtLeastTxId, MetaRecoveryContext recovery,
boolean inProgressOk) throws IOException {
List<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
selectInputStreams(streams, fromTxId, inProgressOk);
selectInputStreams(streams, fromTxId, inProgressOk, true);
try {
checkForGaps(streams, fromTxId, toAtLeastTxId, inProgressOk);

View File

@ -164,10 +164,13 @@ public class FileJournalManager implements JournalManager {
/**
* Find all editlog segments starting at or above the given txid.
* @param fromTxId the txnid which to start looking
* @param forReading whether or not the caller intends to read from the edit
* logs
* @return a list of remote edit logs
* @throws IOException if edit logs cannot be listed.
*/
public List<RemoteEditLog> getRemoteEditLogs(long firstTxId) throws IOException {
public List<RemoteEditLog> getRemoteEditLogs(long firstTxId,
boolean forReading) throws IOException {
File currentDir = sd.getCurrentDir();
List<EditLogFile> allLogFiles = matchEditLogs(currentDir);
List<RemoteEditLog> ret = Lists.newArrayListWithCapacity(
@ -177,11 +180,15 @@ public class FileJournalManager implements JournalManager {
if (elf.hasCorruptHeader() || elf.isInProgress()) continue;
if (elf.getFirstTxId() >= firstTxId) {
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
} else if ((firstTxId > elf.getFirstTxId()) &&
(firstTxId <= elf.getLastTxId())) {
// Note that this behavior is different from getLogFiles below.
throw new IllegalStateException("Asked for firstTxId " + firstTxId
+ " which is in the middle of file " + elf.file);
} else if (elf.getFirstTxId() < firstTxId && firstTxId <= elf.getLastTxId()) {
// If the firstTxId is in the middle of an edit log segment
if (forReading) {
// Note that this behavior is different from getLogFiles below.
throw new IllegalStateException("Asked for firstTxId " + firstTxId
+ " which is in the middle of file " + elf.file);
} else {
ret.add(new RemoteEditLog(elf.firstTxId, elf.lastTxId));
}
}
}
@ -242,7 +249,7 @@ public class FileJournalManager implements JournalManager {
@Override
synchronized public void selectInputStreams(
Collection<EditLogInputStream> streams, long fromTxId,
boolean inProgressOk) throws IOException {
boolean inProgressOk, boolean forReading) throws IOException {
List<EditLogFile> elfs = matchEditLogs(sd.getCurrentDir());
LOG.debug(this + ": selecting input streams starting at " + fromTxId +
(inProgressOk ? " (inProgress ok) " : " (excluding inProgress) ") +

View File

@ -235,10 +235,12 @@ public class JournalSet implements JournalManager {
* may not be sorted-- this is up to the caller.
* @param fromTxId The transaction ID to start looking for streams at
* @param inProgressOk Should we consider unfinalized streams?
* @param forReading Whether or not the caller intends to read from
* the returned streams.
*/
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk) {
long fromTxId, boolean inProgressOk, boolean forReading) {
final PriorityQueue<EditLogInputStream> allStreams =
new PriorityQueue<EditLogInputStream>(64,
EDIT_LOG_INPUT_STREAM_COMPARATOR);
@ -248,7 +250,8 @@ public class JournalSet implements JournalManager {
continue;
}
try {
jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk);
jas.getManager().selectInputStreams(allStreams, fromTxId, inProgressOk,
forReading);
} catch (IOException ioe) {
LOG.warn("Unable to determine input streams from " + jas.getManager() +
". Skipping.", ioe);
@ -587,14 +590,15 @@ public class JournalSet implements JournalManager {
* @param fromTxId Starting transaction id to read the logs.
* @return RemoteEditLogManifest object.
*/
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId) {
public synchronized RemoteEditLogManifest getEditLogManifest(long fromTxId,
boolean forReading) {
// Collect RemoteEditLogs available from each FileJournalManager
List<RemoteEditLog> allLogs = Lists.newArrayList();
for (JournalAndStream j : journals) {
if (j.getManager() instanceof FileJournalManager) {
FileJournalManager fjm = (FileJournalManager)j.getManager();
try {
allLogs.addAll(fjm.getRemoteEditLogs(fromTxId));
allLogs.addAll(fjm.getRemoteEditLogs(fromTxId, forReading));
} catch (Throwable t) {
LOG.warn("Cannot list edit logs in " + fjm, t);
}

View File

@ -42,12 +42,13 @@ interface LogsPurgeable {
*
* @param fromTxId the first transaction id we want to read
* @param inProgressOk whether or not in-progress streams should be returned
* @param forReading whether or not the caller intends to read from the edit logs
*
* @return a list of streams
* @throws IOException if the underlying storage has an error or is otherwise
* inaccessible
*/
void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk) throws IOException;
long fromTxId, boolean inProgressOk, boolean forReading) throws IOException;
}

View File

@ -108,7 +108,7 @@ public class NNStorageRetentionManager {
long purgeLogsFrom = Math.max(0, minimumRequiredTxId - numExtraEditsToRetain);
ArrayList<EditLogInputStream> editLogs = new ArrayList<EditLogInputStream>();
purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false);
purgeableLogs.selectInputStreams(editLogs, purgeLogsFrom, false, false);
Collections.sort(editLogs, new Comparator<EditLogInputStream>() {
@Override
public int compare(EditLogInputStream a, EditLogInputStream b) {

View File

@ -820,7 +820,7 @@ public class SecondaryNameNode implements Runnable {
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxId, boolean inProgressOk) {
long fromTxId, boolean inProgressOk, boolean forReading) {
Iterator<StorageDirectory> iter = storage.dirIterator();
while (iter.hasNext()) {
StorageDirectory dir = iter.next();

View File

@ -169,6 +169,8 @@ message NewEpochResponseProto {
message GetEditLogManifestRequestProto {
required JournalIdProto jid = 1;
required uint64 sinceTxId = 2; // Transaction ID
// Whether or not the client will be reading from the returned streams.
optional bool forReading = 3 [default = true];
}
message GetEditLogManifestResponseProto {

View File

@ -31,6 +31,7 @@ import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.net.URL;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutorService;
@ -900,6 +901,26 @@ public class TestQuorumJournalManager {
"QJM to \\[127.0.0.1:\\d+, 127.0.0.1:\\d+, 127.0.0.1:\\d+\\]");
}
@Test
public void testSelectInputStreamsNotOnBoundary() throws Exception {
final int txIdsPerSegment = 10;
for (int txid = 1; txid <= 5 * txIdsPerSegment; txid += txIdsPerSegment) {
writeSegment(cluster, qjm, txid, txIdsPerSegment, true);
}
File curDir = cluster.getCurrentDir(0, JID);
GenericTestUtils.assertGlobEquals(curDir, "edits_.*",
NNStorage.getFinalizedEditsFileName(1, 10),
NNStorage.getFinalizedEditsFileName(11, 20),
NNStorage.getFinalizedEditsFileName(21, 30),
NNStorage.getFinalizedEditsFileName(31, 40),
NNStorage.getFinalizedEditsFileName(41, 50));
ArrayList<EditLogInputStream> streams = new ArrayList<EditLogInputStream>();
qjm.selectInputStreams(streams, 25, false, false);
verifyEdits(streams, 25, 50);
}
private QuorumJournalManager createSpyingQJM()
throws IOException, URISyntaxException {

View File

@ -73,7 +73,7 @@ public class TestFileJournalManager {
final PriorityQueue<EditLogInputStream> allStreams =
new PriorityQueue<EditLogInputStream>(64,
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
jm.selectInputStreams(allStreams, fromTxId, inProgressOk);
jm.selectInputStreams(allStreams, fromTxId, inProgressOk, true);
EditLogInputStream elis = null;
try {
while ((elis = allStreams.poll()) != null) {
@ -392,7 +392,7 @@ public class TestFileJournalManager {
final PriorityQueue<EditLogInputStream> allStreams =
new PriorityQueue<EditLogInputStream>(64,
JournalSet.EDIT_LOG_INPUT_STREAM_COMPARATOR);
jm.selectInputStreams(allStreams, txId, inProgressOk);
jm.selectInputStreams(allStreams, txId, inProgressOk, true);
EditLogInputStream elis = null, ret;
try {
while ((elis = allStreams.poll()) != null) {
@ -462,6 +462,6 @@ public class TestFileJournalManager {
private static String getLogsAsString(
FileJournalManager fjm, long firstTxId) throws IOException {
return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId));
return Joiner.on(",").join(fjm.getRemoteEditLogs(firstTxId, true));
}
}

View File

@ -170,7 +170,7 @@ public class TestGenericJournalConf {
@Override
public void selectInputStreams(Collection<EditLogInputStream> streams,
long fromTxnId, boolean inProgressOk) {
long fromTxnId, boolean inProgressOk, boolean forReading) {
}
@Override

View File

@ -360,11 +360,12 @@ public class TestNNStorageRetentionManager {
public Void answer(InvocationOnMock invocation) throws Throwable {
Object[] args = invocation.getArguments();
journalSet.selectInputStreams((Collection<EditLogInputStream>)args[0],
(long)((Long)args[1]), (boolean)((Boolean)args[2]));
(long)((Long)args[1]), (boolean)((Boolean)args[2]),
(boolean)((Boolean)args[3]));
return null;
}
}).when(mockLog).selectInputStreams(Mockito.anyCollection(),
Mockito.anyLong(), Mockito.anyBoolean());
Mockito.anyLong(), Mockito.anyBoolean(), Mockito.anyBoolean());
return mockLog;
}
}