From 6e3f0e216ea8d5e00ff4934f00059e4ba0a13041 Mon Sep 17 00:00:00 2001 From: Arpit Agarwal Date: Fri, 20 Jun 2014 05:54:46 +0000 Subject: [PATCH] HADOOP-10279: Merging r1604090 from trunk to branch-2. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/branch-2@1604091 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../ipc/WeightedRoundRobinMultiplexer.java | 148 ++++++++++++++++++ .../TestWeightedRoundRobinMultiplexer.java | 142 +++++++++++++++++ 3 files changed, 293 insertions(+) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedRoundRobinMultiplexer.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedRoundRobinMultiplexer.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index e62fccbfa0d..4bcca2eeb05 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -91,6 +91,9 @@ Release 2.5.0 - UNRELEASED HADOOP-10557. FsShell -cp -pa option for preserving extended ACLs. (Akira Ajisaka via cnauroth) + HADOOP-10279. Create multiplexer, a requirement for the fair queue. + (Chris Li via Arpit Agarwal) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedRoundRobinMultiplexer.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedRoundRobinMultiplexer.java new file mode 100644 index 00000000000..497ca757461 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WeightedRoundRobinMultiplexer.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +/** + * Determines which queue to start reading from, occasionally drawing from + * low-priority queues in order to prevent starvation. Given the pull pattern + * [9, 4, 1] for 3 queues: + * + * The cycle is (a minimum of) 9+4+1=14 reads. + * Queue 0 is read (at least) 9 times + * Queue 1 is read (at least) 4 times + * Queue 2 is read (at least) 1 time + * Repeat + * + * There may be more reads than the minimum due to race conditions. This is + * allowed by design for performance reasons. + */ +public class WeightedRoundRobinMultiplexer { + // Config keys + public static final String IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY = + "faircallqueue.multiplexer.weights"; + + public static final Log LOG = + LogFactory.getLog(WeightedRoundRobinMultiplexer.class); + + private final int numQueues; // The number of queues under our provisioning + + private final AtomicInteger currentQueueIndex; // Current queue we're serving + private final AtomicInteger requestsLeft; // Number of requests left for this queue + + private int[] queueWeights; // The weights for each queue + + public WeightedRoundRobinMultiplexer(int aNumQueues, String ns, + Configuration conf) { + if (aNumQueues <= 0) { + throw new IllegalArgumentException("Requested queues (" + aNumQueues + + ") must be greater than zero."); + } + + this.numQueues = aNumQueues; + this.queueWeights = conf.getInts(ns + "." + + IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY); + + if (this.queueWeights.length == 0) { + this.queueWeights = getDefaultQueueWeights(this.numQueues); + } else if (this.queueWeights.length != this.numQueues) { + throw new IllegalArgumentException(ns + "." + + IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY + " must specify exactly " + + this.numQueues + " weights: one for each priority level."); + } + + this.currentQueueIndex = new AtomicInteger(0); + this.requestsLeft = new AtomicInteger(this.queueWeights[0]); + + LOG.info("WeightedRoundRobinMultiplexer is being used."); + } + + /** + * Creates default weights for each queue. The weights are 2^N. + */ + private int[] getDefaultQueueWeights(int aNumQueues) { + int[] weights = new int[aNumQueues]; + + int weight = 1; // Start low + for(int i = aNumQueues - 1; i >= 0; i--) { // Start at lowest queue + weights[i] = weight; + weight *= 2; // Double every iteration + } + return weights; + } + + /** + * Move to the next queue. + */ + private void moveToNextQueue() { + int thisIdx = this.currentQueueIndex.get(); + + // Wrap to fit in our bounds + int nextIdx = (thisIdx + 1) % this.numQueues; + + // Set to next index: once this is called, requests will start being + // drawn from nextIdx, but requestsLeft will continue to decrement into + // the negatives + this.currentQueueIndex.set(nextIdx); + + // Finally, reset requestsLeft. This will enable moveToNextQueue to be + // called again, for the new currentQueueIndex + this.requestsLeft.set(this.queueWeights[nextIdx]); + } + + /** + * Advances the index, which will change the current index + * if called enough times. + */ + private void advanceIndex() { + // Since we did read, we should decrement + int requestsLeftVal = this.requestsLeft.decrementAndGet(); + + // Strict compare with zero (instead of inequality) so that if another + // thread decrements requestsLeft, only one thread will be responsible + // for advancing currentQueueIndex + if (requestsLeftVal == 0) { + // This is guaranteed to be called exactly once per currentQueueIndex + this.moveToNextQueue(); + } + } + + /** + * Gets the current index. Should be accompanied by a call to + * advanceIndex at some point. + */ + private int getCurrentIndex() { + return this.currentQueueIndex.get(); + } + + /** + * Use the mux by getting and advancing index. + */ + public int getAndAdvanceCurrentIndex() { + int idx = this.getCurrentIndex(); + this.advanceIndex(); + return idx; + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedRoundRobinMultiplexer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedRoundRobinMultiplexer.java new file mode 100644 index 00000000000..642817617e5 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestWeightedRoundRobinMultiplexer.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; + +import static org.junit.Assert.assertEquals; +import org.junit.Test; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; + +import static org.apache.hadoop.ipc.WeightedRoundRobinMultiplexer.IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY; + +public class TestWeightedRoundRobinMultiplexer { + public static final Log LOG = LogFactory.getLog(TestWeightedRoundRobinMultiplexer.class); + + private WeightedRoundRobinMultiplexer mux; + + @Test(expected=IllegalArgumentException.class) + public void testInstantiateNegativeMux() { + mux = new WeightedRoundRobinMultiplexer(-1, "", new Configuration()); + } + + @Test(expected=IllegalArgumentException.class) + public void testInstantiateZeroMux() { + mux = new WeightedRoundRobinMultiplexer(0, "", new Configuration()); + } + + @Test(expected=IllegalArgumentException.class) + public void testInstantiateIllegalMux() { + Configuration conf = new Configuration(); + conf.setStrings("namespace." + IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY, + "1", "2", "3"); + + // ask for 3 weights with 2 queues + mux = new WeightedRoundRobinMultiplexer(2, "namespace", conf); + } + + @Test + public void testLegalInstantiation() { + Configuration conf = new Configuration(); + conf.setStrings("namespace." + IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY, + "1", "2", "3"); + + // ask for 3 weights with 3 queues + mux = new WeightedRoundRobinMultiplexer(3, "namespace.", conf); + } + + @Test + public void testDefaultPattern() { + // Mux of size 1: 0 0 0 0 0, etc + mux = new WeightedRoundRobinMultiplexer(1, "", new Configuration()); + for(int i = 0; i < 10; i++) { + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + } + + // Mux of size 2: 0 0 1 0 0 1 0 0 1, etc + mux = new WeightedRoundRobinMultiplexer(2, "", new Configuration()); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 1); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 1); + + // Size 3: 4x0 2x1 1x2, etc + mux = new WeightedRoundRobinMultiplexer(3, "", new Configuration()); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 1); + assertEquals(mux.getAndAdvanceCurrentIndex(), 1); + assertEquals(mux.getAndAdvanceCurrentIndex(), 2); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + + // Size 4: 8x0 4x1 2x2 1x3 + mux = new WeightedRoundRobinMultiplexer(4, "", new Configuration()); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 1); + assertEquals(mux.getAndAdvanceCurrentIndex(), 1); + assertEquals(mux.getAndAdvanceCurrentIndex(), 1); + assertEquals(mux.getAndAdvanceCurrentIndex(), 1); + assertEquals(mux.getAndAdvanceCurrentIndex(), 2); + assertEquals(mux.getAndAdvanceCurrentIndex(), 2); + assertEquals(mux.getAndAdvanceCurrentIndex(), 3); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + } + + @Test + public void testCustomPattern() { + // 1x0 1x1 + Configuration conf = new Configuration(); + conf.setStrings("test.custom." + IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY, + "1", "1"); + + mux = new WeightedRoundRobinMultiplexer(2, "test.custom", conf); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 1); + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 1); + + // 1x0 3x1 2x2 + conf.setStrings("test.custom." + IPC_CALLQUEUE_WRRMUX_WEIGHTS_KEY, + "1", "3", "2"); + + mux = new WeightedRoundRobinMultiplexer(3, "test.custom", conf); + + for(int i = 0; i < 5; i++) { + assertEquals(mux.getAndAdvanceCurrentIndex(), 0); + assertEquals(mux.getAndAdvanceCurrentIndex(), 1); + assertEquals(mux.getAndAdvanceCurrentIndex(), 1); + assertEquals(mux.getAndAdvanceCurrentIndex(), 1); + assertEquals(mux.getAndAdvanceCurrentIndex(), 2); + assertEquals(mux.getAndAdvanceCurrentIndex(), 2); + } // Ensure pattern repeats + + } +} \ No newline at end of file