From c1db6bf5116780eef8789ef261802f4516f2d125 Mon Sep 17 00:00:00 2001 From: Colin McCabe Date: Tue, 4 Mar 2014 06:35:57 +0000 Subject: [PATCH] HDFS-6040. fix DFSClient issue without libhadoop.so and some other ShortCircuitShm cleanups (cmccabe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1573885 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop/net/unix/DomainSocketWatcher.java | 18 ++- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 3 + .../dev-support/findbugsExcludeFile.xml | 8 +- .../apache/hadoop/hdfs/ShortCircuitShm.java | 15 ++- .../hadoop/hdfs/client/ShortCircuitCache.java | 4 +- .../hdfs/client/TestShortCircuitShm.java | 109 ++++++++++++++++++ 6 files changed, 144 insertions(+), 13 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java index e201995ede4..838f78166d4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/net/unix/DomainSocketWatcher.java @@ -81,6 +81,10 @@ public final class DomainSocketWatcher implements Closeable { */ private static native void anchorNative(); + public static String getLoadingFailureReason() { + return loadingFailureReason; + } + public interface Handler { /** * Handles an event on a socket. An event may be the socket becoming @@ -244,7 +248,9 @@ public void close() throws IOException { lock.lock(); try { if (closed) return; - LOG.info(this + ": closing"); + if (LOG.isDebugEnabled()) { + LOG.debug(this + ": closing"); + } closed = true; } finally { lock.unlock(); @@ -390,8 +396,10 @@ private void sendCallback(String caller, TreeMap entries, final Thread watcherThread = new Thread(new Runnable() { @Override public void run() { - LOG.info(this + ": starting with interruptCheckPeriodMs = " + - interruptCheckPeriodMs); + if (LOG.isDebugEnabled()) { + LOG.debug(this + ": starting with interruptCheckPeriodMs = " + + interruptCheckPeriodMs); + } final TreeMap entries = new TreeMap(); FdSet fdSet = new FdSet(); addNotificationSocket(entries, fdSet); @@ -431,7 +439,9 @@ public void run() { // toRemove are now empty and processedCond has been notified if it // needed to be. if (closed) { - LOG.info(toString() + " thread terminating."); + if (LOG.isDebugEnabled()) { + LOG.debug(toString() + " thread terminating."); + } return; } // Check if someone sent our thread an InterruptedException while we diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 2ea8fc96846..cc039104413 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -273,6 +273,9 @@ Release 2.4.0 - UNRELEASED HDFS-5866. '-maxSize' and '-step' option fail in OfflineImageViewer. (Akira Ajisaka via wheat9) + HDFS-6040. fix DFSClient issue without libhadoop.so and some other + ShortCircuitShm cleanups (cmccabe) + BREAKDOWN OF HDFS-5698 SUBTASKS AND RELATED JIRAS HDFS-5717. Save FSImage header in protobuf. (Haohui Mai via jing9) diff --git a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml index 69f999e6319..fa2c4d4471c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/dev-support/findbugsExcludeFile.xml @@ -165,13 +165,13 @@ - - + + - - + + diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ShortCircuitShm.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ShortCircuitShm.java index 4b8b67acbb2..3d53dbc7daa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ShortCircuitShm.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/ShortCircuitShm.java @@ -30,7 +30,6 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.fs.InvalidRequestException; -import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.nativeio.NativeIO; import org.apache.hadoop.io.nativeio.NativeIO.POSIX; import org.apache.hadoop.util.Shell; @@ -514,7 +513,9 @@ synchronized final public boolean isFull() { * @return The base address of the slot. */ private final long calculateSlotAddress(int slotIdx) { - return this.baseAddress + (slotIdx * BYTES_PER_SLOT); + long offset = slotIdx; + offset *= BYTES_PER_SLOT; + return this.baseAddress + offset; } /** @@ -536,7 +537,6 @@ synchronized public final Slot allocAndRegisterSlot( slot.makeValid(); slots[idx] = slot; if (LOG.isTraceEnabled()) { - //LOG.trace(this + ": allocAndRegisterSlot " + idx); LOG.trace(this + ": allocAndRegisterSlot " + idx + ": allocatedSlots=" + allocatedSlots + StringUtils.getStackTrace(Thread.currentThread())); } @@ -567,6 +567,14 @@ synchronized public final Slot getSlot(int slotIdx) */ synchronized public final Slot registerSlot(int slotIdx, ExtendedBlockId blockId) throws InvalidRequestException { + if (slotIdx < 0) { + throw new InvalidRequestException(this + ": invalid negative slot " + + "index " + slotIdx); + } + if (slotIdx >= slots.length) { + throw new InvalidRequestException(this + ": invalid slot " + + "index " + slotIdx); + } if (allocatedSlots.get(slotIdx)) { throw new InvalidRequestException(this + ": slot " + slotIdx + " is already in use."); @@ -579,7 +587,6 @@ synchronized public final Slot registerSlot(int slotIdx, slots[slotIdx] = slot; allocatedSlots.set(slotIdx, true); if (LOG.isTraceEnabled()) { - //LOG.trace(this + ": registerSlot " + slotIdx); LOG.trace(this + ": registerSlot " + slotIdx + ": allocatedSlots=" + allocatedSlots + StringUtils.getStackTrace(Thread.currentThread())); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java index bd2f8382679..32c26d7c1d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/client/ShortCircuitCache.java @@ -53,6 +53,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.ipc.RetriableException; import org.apache.hadoop.net.unix.DomainSocket; +import org.apache.hadoop.net.unix.DomainSocketWatcher; import org.apache.hadoop.security.token.SecretManager.InvalidToken; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; @@ -375,7 +376,8 @@ public ShortCircuitCache(int maxTotalSize, long maxNonMmappedEvictableLifespanMs this.mmapRetryTimeoutMs = mmapRetryTimeoutMs; this.staleThresholdMs = staleThresholdMs; DfsClientShmManager shmManager = null; - if (shmInterruptCheckMs > 0) { + if ((shmInterruptCheckMs > 0) && + (DomainSocketWatcher.getLoadingFailureReason() == null)) { try { shmManager = new DfsClientShmManager(shmInterruptCheckMs); } catch (IOException e) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitShm.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitShm.java index e69de29bb2d..2ba9ea0a764 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitShm.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/client/TestShortCircuitShm.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.client; + +import java.io.File; +import java.io.FileInputStream; +import java.util.ArrayList; +import java.util.Iterator; + +import org.apache.commons.lang.SystemUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.io.nativeio.NativeIO; +import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory; +import org.apache.hadoop.hdfs.ExtendedBlockId; +import org.apache.hadoop.hdfs.ShortCircuitShm; +import org.apache.hadoop.hdfs.ShortCircuitShm.ShmId; +import org.apache.hadoop.hdfs.ShortCircuitShm.Slot; +import org.junit.Assume; +import org.junit.Before; +import org.junit.Test; +import org.junit.Assert; + +public class TestShortCircuitShm { + public static final Log LOG = LogFactory.getLog(TestShortCircuitShm.class); + + private static final File TEST_BASE = + new File(System.getProperty("test.build.data", "/tmp")); + + @Before + public void before() { + Assume.assumeTrue(NativeIO.isAvailable()); + Assume.assumeTrue(SystemUtils.IS_OS_UNIX); + } + + @Test(timeout=60000) + public void testStartupShutdown() throws Exception { + File path = new File(TEST_BASE, "testStartupShutdown"); + path.mkdirs(); + SharedFileDescriptorFactory factory = + new SharedFileDescriptorFactory("shm_", path.getAbsolutePath()); + FileInputStream stream = + factory.createDescriptor("testStartupShutdown", 4096); + ShortCircuitShm shm = new ShortCircuitShm(ShmId.createRandom(), stream); + shm.free(); + stream.close(); + FileUtil.fullyDelete(path); + } + + @Test(timeout=60000) + public void testAllocateSlots() throws Exception { + File path = new File(TEST_BASE, "testAllocateSlots"); + path.mkdirs(); + SharedFileDescriptorFactory factory = + new SharedFileDescriptorFactory("shm_", path.getAbsolutePath()); + FileInputStream stream = + factory.createDescriptor("testAllocateSlots", 4096); + ShortCircuitShm shm = new ShortCircuitShm(ShmId.createRandom(), stream); + int numSlots = 0; + ArrayList slots = new ArrayList(); + while (!shm.isFull()) { + Slot slot = shm.allocAndRegisterSlot(new ExtendedBlockId(123L, "test_bp1")); + slots.add(slot); + numSlots++; + } + LOG.info("allocated " + numSlots + " slots before running out."); + int slotIdx = 0; + for (Iterator iter = shm.slotIterator(); + iter.hasNext(); ) { + Assert.assertTrue(slots.contains(iter.next())); + } + for (Slot slot : slots) { + Assert.assertFalse(slot.addAnchor()); + Assert.assertEquals(slotIdx++, slot.getSlotIdx()); + } + for (Slot slot : slots) { + slot.makeAnchorable(); + } + for (Slot slot : slots) { + Assert.assertTrue(slot.addAnchor()); + } + for (Slot slot : slots) { + slot.removeAnchor(); + } + for (Slot slot : slots) { + shm.unregisterSlot(slot.getSlotIdx()); + slot.makeInvalid(); + } + shm.free(); + stream.close(); + FileUtil.fullyDelete(path); + } +}