YARN-8600. RegistryDNS hang when remote lookup does not reply. Contributed by Eric Yang
(cherry picked from commit 603a57476c
)
This commit is contained in:
parent
2a94823f32
commit
62cc373dc5
|
@ -0,0 +1,39 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
package org.apache.hadoop.registry.server.dns;
|
||||||
|
|
||||||
|
import java.util.concurrent.Callable;
|
||||||
|
|
||||||
|
import org.xbill.DNS.Lookup;
|
||||||
|
import org.xbill.DNS.Name;
|
||||||
|
import org.xbill.DNS.Record;
|
||||||
|
|
||||||
|
public class LookupTask implements Callable<Record[]> {
|
||||||
|
|
||||||
|
private Name name;
|
||||||
|
private int type;
|
||||||
|
|
||||||
|
public LookupTask(Name name, int type) {
|
||||||
|
this.name = name;
|
||||||
|
this.type = type;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public Record[] call() throws Exception {
|
||||||
|
return new Lookup(name, type).run();
|
||||||
|
}
|
||||||
|
}
|
|
@ -99,9 +99,13 @@ import java.util.Properties;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
import java.util.concurrent.ConcurrentMap;
|
import java.util.concurrent.ConcurrentMap;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.concurrent.Future;
|
||||||
import java.util.concurrent.ThreadFactory;
|
import java.util.concurrent.ThreadFactory;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.TimeoutException;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
@ -941,7 +945,7 @@ public class RegistryDNS extends AbstractService implements DNSOperations,
|
||||||
* @param port local port.
|
* @param port local port.
|
||||||
* @throws IOException if the UDP processing fails.
|
* @throws IOException if the UDP processing fails.
|
||||||
*/
|
*/
|
||||||
private void serveNIOUDP(DatagramChannel channel,
|
private synchronized void serveNIOUDP(DatagramChannel channel,
|
||||||
InetAddress addr, int port) throws Exception {
|
InetAddress addr, int port) throws Exception {
|
||||||
SocketAddress remoteAddress = null;
|
SocketAddress remoteAddress = null;
|
||||||
try {
|
try {
|
||||||
|
@ -1177,13 +1181,20 @@ public class RegistryDNS extends AbstractService implements DNSOperations,
|
||||||
* @return DNS records
|
* @return DNS records
|
||||||
*/
|
*/
|
||||||
protected Record[] getRecords(Name name, int type) {
|
protected Record[] getRecords(Name name, int type) {
|
||||||
|
Record[] result = null;
|
||||||
|
ExecutorService executor = Executors.newSingleThreadExecutor();
|
||||||
|
Future<Record[]> future = executor.submit(new LookupTask(name, type));
|
||||||
try {
|
try {
|
||||||
return new Lookup(name, type).run();
|
result = future.get(1500, TimeUnit.MILLISECONDS);
|
||||||
} catch (NullPointerException |
|
return result;
|
||||||
|
} catch (InterruptedException | ExecutionException |
|
||||||
|
TimeoutException | NullPointerException |
|
||||||
ExceptionInInitializerError e) {
|
ExceptionInInitializerError e) {
|
||||||
LOG.error("Fail to lookup: " + name, e);
|
LOG.warn("Failed to lookup: {} type: {}", name, Type.string(type), e);
|
||||||
|
return result;
|
||||||
|
} finally {
|
||||||
|
executor.shutdown();
|
||||||
}
|
}
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -697,6 +697,14 @@ public class TestRegistryDNS extends Assert {
|
||||||
assertTrue("not an ARecord", recs[0] instanceof ARecord);
|
assertTrue("not an ARecord", recs[0] instanceof ARecord);
|
||||||
assertTrue("not an ARecord", recs[1] instanceof ARecord);
|
assertTrue("not an ARecord", recs[1] instanceof ARecord);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=5000)
|
||||||
|
public void testUpstreamFault() throws Exception {
|
||||||
|
Name name = Name.fromString("19.0.17.172.in-addr.arpa.");
|
||||||
|
Record[] recs = getRegistryDNS().getRecords(name, Type.CNAME);
|
||||||
|
assertNull("Record is not null", recs);
|
||||||
|
}
|
||||||
|
|
||||||
public RegistryDNS getRegistryDNS() {
|
public RegistryDNS getRegistryDNS() {
|
||||||
return registryDNS;
|
return registryDNS;
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue