YARN-2944. InMemorySCMStore can not be instantiated with ReflectionUtils#newInstance. (Chris Trezzo via kasha)

(cherry picked from commit a1bd140964)
This commit is contained in:
Karthik Kambatla 2014-12-17 16:04:16 -08:00
parent 2d832ad2eb
commit 375a1e5971
9 changed files with 165 additions and 58 deletions

View File

@ -202,6 +202,9 @@ Release 2.7.0 - UNRELEASED
YARN-2964. FSLeafQueue#assignContainer - document the reason for using both write and
read locks. (Tsuyoshi Ozawa via kasha)
YARN-2944. InMemorySCMStore can not be instantiated with ReflectionUtils#newInstance.
(Chris Trezzo via kasha)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -152,24 +152,4 @@ public class SharedCacheManager extends CompositeService {
System.exit(-1);
}
}
@Private
@SuppressWarnings("unchecked")
public static AppChecker createAppCheckerService(Configuration conf) {
Class<? extends AppChecker> defaultCheckerClass;
try {
defaultCheckerClass =
(Class<? extends AppChecker>) Class
.forName(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS);
} catch (Exception e) {
throw new YarnRuntimeException("Invalid default scm app checker class"
+ YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS, e);
}
AppChecker checker =
ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.SCM_APP_CHECKER_CLASS, defaultCheckerClass,
AppChecker.class), conf);
return checker;
}
}

View File

@ -86,6 +86,11 @@ public class InMemorySCMStore extends SCMStore {
private int initialDelayMin;
private int checkPeriodMin;
public InMemorySCMStore() {
super(InMemorySCMStore.class.getName());
}
@VisibleForTesting
public InMemorySCMStore(AppChecker appChecker) {
super(InMemorySCMStore.class.getName(), appChecker);
}
@ -137,18 +142,22 @@ public class InMemorySCMStore extends SCMStore {
@Override
protected void serviceStop() throws Exception {
LOG.info("Shutting down the background thread.");
scheduler.shutdownNow();
try {
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.warn("Gave up waiting for the app check task to shutdown.");
LOG.info("Stopping the " + InMemorySCMStore.class.getSimpleName()
+ " service.");
if (scheduler != null) {
LOG.info("Shutting down the background thread.");
scheduler.shutdownNow();
try {
if (!scheduler.awaitTermination(10, TimeUnit.SECONDS)) {
LOG.warn("Gave up waiting for the app check task to shutdown.");
}
} catch (InterruptedException e) {
LOG.warn(
"The InMemorySCMStore was interrupted while shutting down the "
+ "app check task.", e);
}
} catch (InterruptedException e) {
LOG.warn("The InMemorySCMStore was interrupted while shutting down the "
+ "app check task.", e);
LOG.info("The background thread stopped.");
}
LOG.info("The background thread stopped.");
super.serviceStop();
}

View File

@ -24,11 +24,17 @@ import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Evolving;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.service.CompositeService;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
import com.google.common.annotations.VisibleForTesting;
/**
* An abstract class for the data store used by the shared cache manager
@ -39,13 +45,27 @@ import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
@Evolving
public abstract class SCMStore extends CompositeService {
protected final AppChecker appChecker;
protected AppChecker appChecker;
protected SCMStore(String name, AppChecker appChecker) {
protected SCMStore(String name) {
super(name);
}
@VisibleForTesting
SCMStore(String name, AppChecker appChecker) {
super(name);
this.appChecker = appChecker;
}
@Override
protected void serviceInit(Configuration conf) throws Exception {
if (this.appChecker == null) {
this.appChecker = createAppCheckerService(conf);
}
addService(appChecker);
super.serviceInit(conf);
}
/**
* Add a resource to the shared cache and it's associated filename. The
* resource is identified by a unique key. If the key already exists no action
@ -164,4 +184,30 @@ public abstract class SCMStore extends CompositeService {
@Private
public abstract boolean isResourceEvictable(String key, FileStatus file);
/**
* Create an instance of the AppChecker service via reflection based on the
* {@link YarnConfiguration#SCM_APP_CHECKER_CLASS} parameter.
*
* @param conf
* @return an instance of the AppChecker class
*/
@Private
@SuppressWarnings("unchecked")
public static AppChecker createAppCheckerService(Configuration conf) {
Class<? extends AppChecker> defaultCheckerClass;
try {
defaultCheckerClass =
(Class<? extends AppChecker>) Class
.forName(YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS);
} catch (Exception e) {
throw new YarnRuntimeException("Invalid default scm app checker class"
+ YarnConfiguration.DEFAULT_SCM_APP_CHECKER_CLASS, e);
}
AppChecker checker =
ReflectionUtils.newInstance(conf.getClass(
YarnConfiguration.SCM_APP_CHECKER_CLASS, defaultCheckerClass,
AppChecker.class), conf);
return checker;
}
}

View File

@ -0,0 +1,43 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager;
import java.util.ArrayList;
import java.util.Collection;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* A dummy app checker class for testing only.
*/
public class DummyAppChecker extends AppChecker {
@Override
@Private
public boolean isApplicationActive(ApplicationId id) throws YarnException {
return false;
}
@Override
@Private
public Collection<ApplicationId> getActiveApplications() throws YarnException {
return new ArrayList<ApplicationId>();
}
}

View File

@ -21,7 +21,7 @@ package org.apache.hadoop.yarn.server.sharedcachemanager;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import java.io.File;
import java.io.IOException;
@ -84,7 +84,7 @@ public class TestClientSCMProtocolService {
conf.set(YarnConfiguration.SCM_STORE_CLASS,
InMemorySCMStore.class.getName());
conf.set(YarnConfiguration.SHARED_CACHE_ROOT, testDir.getPath());
AppChecker appChecker = mock(AppChecker.class);
AppChecker appChecker = spy(new DummyAppChecker());
store = new InMemorySCMStore(appChecker);
store.init(conf);
store.start();

View File

@ -22,7 +22,7 @@ import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import java.io.File;
import java.io.IOException;
@ -82,7 +82,7 @@ public class TestSharedCacheUploaderService {
conf.set(YarnConfiguration.SCM_STORE_CLASS,
InMemorySCMStore.class.getName());
conf.set(YarnConfiguration.SHARED_CACHE_ROOT, testDir.getPath());
AppChecker appChecker = mock(AppChecker.class);
AppChecker appChecker = spy(new DummyAppChecker());
store = new InMemorySCMStore(appChecker);
store.init(conf);
store.start();

View File

@ -0,0 +1,41 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.sharedcachemanager.store;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.ReflectionUtils;
import org.junit.Test;
/**
* All test classes that test an SCMStore implementation must extend this class.
*/
public abstract class SCMStoreBaseTest {
/**
* Get the SCMStore implementation class associated with this test class.
*/
abstract Class<? extends SCMStore> getStoreClass();
@Test
public void TestZeroArgConstructor() throws Exception {
// Test that the SCMStore implementation class is compatible with
// ReflectionUtils#newInstance
ReflectionUtils.newInstance(getStoreClass(), new Configuration());
}
}

View File

@ -42,22 +42,26 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.sharedcachemanager.AppChecker;
import org.apache.hadoop.yarn.server.sharedcachemanager.DummyAppChecker;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
public class TestInMemorySCMStore {
public class TestInMemorySCMStore extends SCMStoreBaseTest {
private InMemorySCMStore store;
private AppChecker checker;
@Override
Class<? extends SCMStore> getStoreClass() {
return InMemorySCMStore.class;
}
@Before
public void setup() {
this.checker = spy(new DummyAppChecker());
@ -310,23 +314,4 @@ public class TestInMemorySCMStore {
private ApplicationId createAppId(int id, long timestamp) {
return ApplicationId.newInstance(timestamp, id);
}
class DummyAppChecker extends AppChecker {
@Override
@Private
public boolean isApplicationActive(ApplicationId id) throws YarnException {
// stub
return false;
}
@Override
@Private
public Collection<ApplicationId> getActiveApplications()
throws YarnException {
// stub
return null;
}
}
}