YARN-8600. RegistryDNS hang when remote lookup does not reply. Contributed by Eric Yang

This commit is contained in:
Shane Kumpf 2018-08-01 12:22:01 -06:00
parent 67c65da261
commit 603a57476c
3 changed files with 63 additions and 5 deletions

View File

@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.registry.server.dns;
import java.util.concurrent.Callable;
import org.xbill.DNS.Lookup;
import org.xbill.DNS.Name;
import org.xbill.DNS.Record;
public class LookupTask implements Callable<Record[]> {
private Name name;
private int type;
public LookupTask(Name name, int type) {
this.name = name;
this.type = type;
}
@Override
public Record[] call() throws Exception {
return new Lookup(name, type).run();
}
}

View File

@ -99,9 +99,13 @@ import java.util.Properties;
import java.util.concurrent.Callable; import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
@ -941,7 +945,7 @@ public class RegistryDNS extends AbstractService implements DNSOperations,
* @param port local port. * @param port local port.
* @throws IOException if the UDP processing fails. * @throws IOException if the UDP processing fails.
*/ */
private void serveNIOUDP(DatagramChannel channel, private synchronized void serveNIOUDP(DatagramChannel channel,
InetAddress addr, int port) throws Exception { InetAddress addr, int port) throws Exception {
SocketAddress remoteAddress = null; SocketAddress remoteAddress = null;
try { try {
@ -1177,13 +1181,20 @@ public class RegistryDNS extends AbstractService implements DNSOperations,
* @return DNS records * @return DNS records
*/ */
protected Record[] getRecords(Name name, int type) { protected Record[] getRecords(Name name, int type) {
Record[] result = null;
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Record[]> future = executor.submit(new LookupTask(name, type));
try { try {
return new Lookup(name, type).run(); result = future.get(1500, TimeUnit.MILLISECONDS);
} catch (NullPointerException | return result;
} catch (InterruptedException | ExecutionException |
TimeoutException | NullPointerException |
ExceptionInInitializerError e) { ExceptionInInitializerError e) {
LOG.error("Fail to lookup: " + name, e); LOG.warn("Failed to lookup: {} type: {}", name, Type.string(type), e);
return result;
} finally {
executor.shutdown();
} }
return null;
} }
/** /**

View File

@ -697,6 +697,14 @@ public class TestRegistryDNS extends Assert {
assertTrue("not an ARecord", recs[0] instanceof ARecord); assertTrue("not an ARecord", recs[0] instanceof ARecord);
assertTrue("not an ARecord", recs[1] instanceof ARecord); assertTrue("not an ARecord", recs[1] instanceof ARecord);
} }
@Test(timeout=5000)
public void testUpstreamFault() throws Exception {
Name name = Name.fromString("19.0.17.172.in-addr.arpa.");
Record[] recs = getRegistryDNS().getRecords(name, Type.CNAME);
assertNull("Record is not null", recs);
}
public RegistryDNS getRegistryDNS() { public RegistryDNS getRegistryDNS() {
return registryDNS; return registryDNS;
} }