HDFS-6040. fix DFSClient issue without libhadoop.so and some other ShortCircuitShm cleanups (cmccabe)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1573885 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Colin McCabe 2014-03-04 06:35:57 +00:00
parent b44dfd6725
commit c1db6bf511
6 changed files with 144 additions and 13 deletions

View File

@ -81,6 +81,10 @@ public final class DomainSocketWatcher implements Closeable {
*/
private static native void anchorNative();
public static String getLoadingFailureReason() {
return loadingFailureReason;
}
public interface Handler {
/**
* Handles an event on a socket. An event may be the socket becoming
@ -244,7 +248,9 @@ public void close() throws IOException {
lock.lock();
try {
if (closed) return;
LOG.info(this + ": closing");
if (LOG.isDebugEnabled()) {
LOG.debug(this + ": closing");
}
closed = true;
} finally {
lock.unlock();
@ -390,8 +396,10 @@ private void sendCallback(String caller, TreeMap<Integer, Entry> entries,
final Thread watcherThread = new Thread(new Runnable() {
@Override
public void run() {
LOG.info(this + ": starting with interruptCheckPeriodMs = " +
interruptCheckPeriodMs);
if (LOG.isDebugEnabled()) {
LOG.debug(this + ": starting with interruptCheckPeriodMs = " +
interruptCheckPeriodMs);
}
final TreeMap<Integer, Entry> entries = new TreeMap<Integer, Entry>();
FdSet fdSet = new FdSet();
addNotificationSocket(entries, fdSet);
@ -431,7 +439,9 @@ public void run() {
// toRemove are now empty and processedCond has been notified if it
// needed to be.
if (closed) {
LOG.info(toString() + " thread terminating.");
if (LOG.isDebugEnabled()) {
LOG.debug(toString() + " thread terminating.");
}
return;
}
// Check if someone sent our thread an InterruptedException while we

View File

@ -273,6 +273,9 @@ Release 2.4.0 - UNRELEASED
HDFS-5866. '-maxSize' and '-step' option fail in OfflineImageViewer.
(Akira Ajisaka via wheat9)
HDFS-6040. fix DFSClient issue without libhadoop.so and some other
ShortCircuitShm cleanups (cmccabe)
BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS
HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9)

View File

@ -165,13 +165,13 @@
<Bug pattern="DM_STRING_CTOR" />
</Match>
<Match>
<Class name="org.apache.hadoop.hdfs.client.ClientMmapManager" />
<Method name="create" />
<Class name="org.apache.hadoop.hdfs.client.DfsClientShmManager$EndpointShmManager" />
<Method name="allocSlot" />
<Bug pattern="UL_UNRELEASED_LOCK_EXCEPTION_PATH" />
</Match>
<Match>
<Class name="org.apache.hadoop.hdfs.client.ClientMmapManager" />
<Method name="create" />
<Class name="org.apache.hadoop.hdfs.client.DfsClientShmManager$EndpointShmManager" />
<Method name="allocSlot" />
<Bug pattern="UL_UNRELEASED_LOCK" />
</Match>
<!-- Manually verified to be okay, we want to throw away the top bit here -->

View File

@ -30,7 +30,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.NativeIO.POSIX;
import org.apache.hadoop.util.Shell;
@ -514,7 +513,9 @@ synchronized final public boolean isFull() {
* @return The base address of the slot.
*/
private final long calculateSlotAddress(int slotIdx) {
return this.baseAddress + (slotIdx * BYTES_PER_SLOT);
long offset = slotIdx;
offset *= BYTES_PER_SLOT;
return this.baseAddress + offset;
}
/**
@ -536,7 +537,6 @@ synchronized public final Slot allocAndRegisterSlot(
slot.makeValid();
slots[idx] = slot;
if (LOG.isTraceEnabled()) {
//LOG.trace(this + ": allocAndRegisterSlot " + idx);
LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots +
StringUtils.getStackTrace(Thread.currentThread()));
}
@ -567,6 +567,14 @@ synchronized public final Slot getSlot(int slotIdx)
*/
synchronized public final Slot registerSlot(int slotIdx,
ExtendedBlockId blockId) throws InvalidRequestException {
if (slotIdx < 0) {
throw new InvalidRequestException(this + ": invalid negative slot " +
"index " + slotIdx);
}
if (slotIdx >= slots.length) {
throw new InvalidRequestException(this + ": invalid slot " +
"index " + slotIdx);
}
if (allocatedSlots.get(slotIdx)) {
throw new InvalidRequestException(this + ": slot " + slotIdx +
" is already in use.");
@ -579,7 +587,6 @@ synchronized public final Slot registerSlot(int slotIdx,
slots[slotIdx] = slot;
allocatedSlots.set(slotIdx, true);
if (LOG.isTraceEnabled()) {
//LOG.trace(this + ": registerSlot " + slotIdx);
LOG.trace(this + ": registerSlot " + slotIdx + ": allocatedSlots=" + allocatedSlots +
StringUtils.getStackTrace(Thread.currentThread()));
}

View File

@ -53,6 +53,7 @@
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.ipc.RetriableException;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.DomainSocketWatcher;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time;
@ -375,7 +376,8 @@ public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs
this.mmapRetryTimeoutMs = mmapRetryTimeoutMs;
this.staleThresholdMs = staleThresholdMs;
DfsClientShmManager shmManager = null;
if (shmInterruptCheckMs > 0) {
if ((shmInterruptCheckMs > 0) &&
(DomainSocketWatcher.getLoadingFailureReason() == null)) {
try {
shmManager = new DfsClientShmManager(shmInterruptCheckMs);
} catch (IOException e) {

View File

@ -0,0 +1,109 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.client;
import java.io.File;
import java.io.FileInputStream;
import java.util.ArrayList;
import java.util.Iterator;
import org.apache.commons.lang.SystemUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.io.nativeio.NativeIO;
import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.ShortCircuitShm;
import org.apache.hadoop.hdfs.ShortCircuitShm.ShmId;
import org.apache.hadoop.hdfs.ShortCircuitShm.Slot;
import org.junit.Assume;
import org.junit.Before;
import org.junit.Test;
import org.junit.Assert;
public class TestShortCircuitShm {
public static final Log LOG = LogFactory.getLog(TestShortCircuitShm.class);
private static final File TEST_BASE =
new File(System.getProperty("test.build.data", "/tmp"));
@Before
public void before() {
Assume.assumeTrue(NativeIO.isAvailable());
Assume.assumeTrue(SystemUtils.IS_OS_UNIX);
}
@Test(timeout=60000)
public void testStartupShutdown() throws Exception {
File path = new File(TEST_BASE, "testStartupShutdown");
path.mkdirs();
SharedFileDescriptorFactory factory =
new SharedFileDescriptorFactory("shm_", path.getAbsolutePath());
FileInputStream stream =
factory.createDescriptor("testStartupShutdown", 4096);
ShortCircuitShm shm = new ShortCircuitShm(ShmId.createRandom(), stream);
shm.free();
stream.close();
FileUtil.fullyDelete(path);
}
@Test(timeout=60000)
public void testAllocateSlots() throws Exception {
File path = new File(TEST_BASE, "testAllocateSlots");
path.mkdirs();
SharedFileDescriptorFactory factory =
new SharedFileDescriptorFactory("shm_", path.getAbsolutePath());
FileInputStream stream =
factory.createDescriptor("testAllocateSlots", 4096);
ShortCircuitShm shm = new ShortCircuitShm(ShmId.createRandom(), stream);
int numSlots = 0;
ArrayList<Slot> slots = new ArrayList<Slot>();
while (!shm.isFull()) {
Slot slot = shm.allocAndRegisterSlot(new ExtendedBlockId(123L, "test_bp1"));
slots.add(slot);
numSlots++;
}
LOG.info("allocated " + numSlots + " slots before running out.");
int slotIdx = 0;
for (Iterator<Slot> iter = shm.slotIterator();
iter.hasNext(); ) {
Assert.assertTrue(slots.contains(iter.next()));
}
for (Slot slot : slots) {
Assert.assertFalse(slot.addAnchor());
Assert.assertEquals(slotIdx++, slot.getSlotIdx());
}
for (Slot slot : slots) {
slot.makeAnchorable();
}
for (Slot slot : slots) {
Assert.assertTrue(slot.addAnchor());
}
for (Slot slot : slots) {
slot.removeAnchor();
}
for (Slot slot : slots) {
shm.unregisterSlot(slot.getSlotIdx());
slot.makeInvalid();
}
shm.free();
stream.close();
FileUtil.fullyDelete(path);
}
}