HADOOP-17099. Replace Guava Predicate with Java8+ Predicate
Signed-off-by: Jonathan Eagles <jeagles@gmail.com>
This commit is contained in:
parent
98fcffe93f
commit
1f71c4ae71
|
@ -123,7 +123,7 @@
|
||||||
<property name="regexp" value="true"/>
|
<property name="regexp" value="true"/>
|
||||||
<property name="illegalPkgs" value="^sun\.[^.]+"/>
|
<property name="illegalPkgs" value="^sun\.[^.]+"/>
|
||||||
<property name="illegalClasses"
|
<property name="illegalClasses"
|
||||||
value="^com\.google\.common\.base\.(Optional|Function), ^com\.google\.common\.collect\.(ImmutableListMultimap)"/>
|
value="^com\.google\.common\.base\.(Optional|Function|Predicate), ^com\.google\.common\.collect\.(ImmutableListMultimap)"/>
|
||||||
</module>
|
</module>
|
||||||
<module name="RedundantImport"/>
|
<module name="RedundantImport"/>
|
||||||
<module name="UnusedImports"/>
|
<module name="UnusedImports"/>
|
||||||
|
|
|
@ -18,8 +18,8 @@
|
||||||
|
|
||||||
package org.apache.hadoop.metrics2.impl;
|
package org.apache.hadoop.metrics2.impl;
|
||||||
|
|
||||||
import com.google.common.base.Predicate;
|
import java.util.function.Predicate;
|
||||||
import com.google.common.collect.Iterables;
|
import java.util.stream.StreamSupport;
|
||||||
import org.apache.hadoop.metrics2.AbstractMetric;
|
import org.apache.hadoop.metrics2.AbstractMetric;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecord;
|
import org.apache.hadoop.metrics2.MetricsRecord;
|
||||||
import org.apache.hadoop.metrics2.MetricsTag;
|
import org.apache.hadoop.metrics2.MetricsTag;
|
||||||
|
@ -65,16 +65,22 @@ public class MetricsRecords {
|
||||||
resourceLimitMetric);
|
resourceLimitMetric);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static MetricsTag getFirstTagByName(MetricsRecord record, String name) {
|
private static MetricsTag getFirstTagByName(MetricsRecord record,
|
||||||
return Iterables.getFirst(Iterables.filter(record.tags(),
|
String name) {
|
||||||
new MetricsTagPredicate(name)), null);
|
if (record.tags() == null) {
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
return record.tags().stream().filter(
|
||||||
|
new MetricsTagPredicate(name)).findFirst().orElse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static AbstractMetric getFirstMetricByName(
|
private static AbstractMetric getFirstMetricByName(
|
||||||
MetricsRecord record, String name) {
|
MetricsRecord record, String name) {
|
||||||
return Iterables.getFirst(
|
if (record.metrics() == null) {
|
||||||
Iterables.filter(record.metrics(), new AbstractMetricPredicate(name)),
|
return null;
|
||||||
null);
|
}
|
||||||
|
return StreamSupport.stream(record.metrics().spliterator(), false)
|
||||||
|
.filter(new AbstractMetricPredicate(name)).findFirst().orElse(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class MetricsTagPredicate implements Predicate<MetricsTag> {
|
private static class MetricsTagPredicate implements Predicate<MetricsTag> {
|
||||||
|
@ -86,7 +92,7 @@ public class MetricsRecords {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean apply(MetricsTag input) {
|
public boolean test(MetricsTag input) {
|
||||||
return input.name().equals(tagName);
|
return input.name().equals(tagName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -101,7 +107,7 @@ public class MetricsRecords {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean apply(AbstractMetric input) {
|
public boolean test(AbstractMetric input) {
|
||||||
return input.name().equals(metricName);
|
return input.name().equals(metricName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -23,9 +23,7 @@ import java.io.IOException;
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
import java.util.concurrent.atomic.*;
|
import java.util.concurrent.atomic.*;
|
||||||
|
import java.util.stream.StreamSupport;
|
||||||
import javax.annotation.Nullable;
|
|
||||||
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.runner.RunWith;
|
import org.junit.runner.RunWith;
|
||||||
|
|
||||||
|
@ -38,7 +36,6 @@ import org.mockito.stubbing.Answer;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.mockito.Mockito.*;
|
||||||
|
|
||||||
import com.google.common.base.Predicate;
|
|
||||||
import com.google.common.base.Supplier;
|
import com.google.common.base.Supplier;
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
|
|
||||||
|
@ -59,7 +56,6 @@ import org.apache.hadoop.metrics2.lib.MetricsRegistry;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
|
||||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -246,13 +242,9 @@ public class TestMetricsSystemImpl {
|
||||||
for (Thread t : threads)
|
for (Thread t : threads)
|
||||||
t.join();
|
t.join();
|
||||||
assertEquals(0L, ms.droppedPubAll.value());
|
assertEquals(0L, ms.droppedPubAll.value());
|
||||||
assertTrue(StringUtils.join("\n", Arrays.asList(results)),
|
assertTrue(String.join("\n", Arrays.asList(results)),
|
||||||
Iterables.all(Arrays.asList(results), new Predicate<String>() {
|
Arrays.asList(results).stream().allMatch(
|
||||||
@Override
|
input -> input.equalsIgnoreCase("Passed")));
|
||||||
public boolean apply(@Nullable String input) {
|
|
||||||
return input.equalsIgnoreCase("Passed");
|
|
||||||
}
|
|
||||||
}));
|
|
||||||
ms.stop();
|
ms.stop();
|
||||||
ms.shutdown();
|
ms.shutdown();
|
||||||
}
|
}
|
||||||
|
@ -482,14 +474,12 @@ public class TestMetricsSystemImpl {
|
||||||
ms.onTimerEvent();
|
ms.onTimerEvent();
|
||||||
verify(dataSink, timeout(500).times(2)).putMetrics(r1.capture());
|
verify(dataSink, timeout(500).times(2)).putMetrics(r1.capture());
|
||||||
List<MetricsRecord> mr = r1.getAllValues();
|
List<MetricsRecord> mr = r1.getAllValues();
|
||||||
Number qSize = Iterables.find(mr.get(1).metrics(),
|
Number qSize = StreamSupport.stream(mr.get(1).metrics().spliterator(),
|
||||||
new Predicate<AbstractMetric>() {
|
false).filter(
|
||||||
@Override
|
input -> {
|
||||||
public boolean apply(@Nullable AbstractMetric input) {
|
assert input != null;
|
||||||
assert input != null;
|
return input.name().equals("Sink_slowSinkQsize");
|
||||||
return input.name().equals("Sink_slowSinkQsize");
|
}).findFirst().get().value();
|
||||||
}
|
|
||||||
}).value();
|
|
||||||
assertEquals(1, qSize);
|
assertEquals(1, qSize);
|
||||||
} finally {
|
} finally {
|
||||||
proceedSignal.countDown();
|
proceedSignal.countDown();
|
||||||
|
|
|
@ -21,9 +21,8 @@ import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.HashMultimap;
|
import com.google.common.collect.HashMultimap;
|
||||||
import com.google.common.collect.Multimap;
|
import com.google.common.collect.Multimap;
|
||||||
import com.google.common.collect.UnmodifiableIterator;
|
import com.google.common.collect.UnmodifiableIterator;
|
||||||
import com.google.common.collect.Iterables;
|
|
||||||
import com.google.common.collect.Collections2;
|
|
||||||
|
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
@ -40,7 +39,7 @@ import java.util.Collection;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import com.google.common.base.Predicate;
|
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.util.CombinedHostsFileReader;
|
import org.apache.hadoop.hdfs.util.CombinedHostsFileReader;
|
||||||
|
|
||||||
|
@ -82,37 +81,26 @@ public class CombinedHostFileManager extends HostConfigManager {
|
||||||
// If the includes list is empty, act as if everything is in the
|
// If the includes list is empty, act as if everything is in the
|
||||||
// includes list.
|
// includes list.
|
||||||
synchronized boolean isIncluded(final InetSocketAddress address) {
|
synchronized boolean isIncluded(final InetSocketAddress address) {
|
||||||
return emptyInServiceNodeLists || Iterables.any(
|
return emptyInServiceNodeLists || allDNs.get(address.getAddress())
|
||||||
allDNs.get(address.getAddress()),
|
.stream().anyMatch(
|
||||||
new Predicate<DatanodeAdminProperties>() {
|
input -> input.getPort() == 0 ||
|
||||||
public boolean apply(DatanodeAdminProperties input) {
|
input.getPort() == address.getPort());
|
||||||
return input.getPort() == 0 ||
|
|
||||||
input.getPort() == address.getPort();
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized boolean isExcluded(final InetSocketAddress address) {
|
synchronized boolean isExcluded(final InetSocketAddress address) {
|
||||||
return Iterables.any(allDNs.get(address.getAddress()),
|
return allDNs.get(address.getAddress()).stream().anyMatch(
|
||||||
new Predicate<DatanodeAdminProperties>() {
|
input -> input.getAdminState().equals(
|
||||||
public boolean apply(DatanodeAdminProperties input) {
|
AdminStates.DECOMMISSIONED) &&
|
||||||
return input.getAdminState().equals(
|
(input.getPort() == 0 ||
|
||||||
AdminStates.DECOMMISSIONED) &&
|
input.getPort() == address.getPort()));
|
||||||
(input.getPort() == 0 ||
|
|
||||||
input.getPort() == address.getPort());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized String getUpgradeDomain(final InetSocketAddress address) {
|
synchronized String getUpgradeDomain(final InetSocketAddress address) {
|
||||||
Iterable<DatanodeAdminProperties> datanode = Iterables.filter(
|
Iterable<DatanodeAdminProperties> datanode =
|
||||||
allDNs.get(address.getAddress()),
|
allDNs.get(address.getAddress()).stream().filter(
|
||||||
new Predicate<DatanodeAdminProperties>() {
|
input -> (input.getPort() == 0 ||
|
||||||
public boolean apply(DatanodeAdminProperties input) {
|
input.getPort() == address.getPort())).collect(
|
||||||
return (input.getPort() == 0 ||
|
Collectors.toList());
|
||||||
input.getPort() == address.getPort());
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return datanode.iterator().hasNext() ?
|
return datanode.iterator().hasNext() ?
|
||||||
datanode.iterator().next().getUpgradeDomain() : null;
|
datanode.iterator().next().getUpgradeDomain() : null;
|
||||||
}
|
}
|
||||||
|
@ -127,36 +115,22 @@ public class CombinedHostFileManager extends HostConfigManager {
|
||||||
}
|
}
|
||||||
|
|
||||||
Iterable<InetSocketAddress> getExcludes() {
|
Iterable<InetSocketAddress> getExcludes() {
|
||||||
return new Iterable<InetSocketAddress>() {
|
return () -> new HostIterator(
|
||||||
@Override
|
allDNs.entries().stream().filter(
|
||||||
public Iterator<InetSocketAddress> iterator() {
|
entry -> entry.getValue().getAdminState().equals(
|
||||||
return new HostIterator(
|
AdminStates.DECOMMISSIONED)).collect(
|
||||||
Collections2.filter(allDNs.entries(),
|
Collectors.toList()));
|
||||||
new Predicate<java.util.Map.Entry<InetAddress,
|
|
||||||
DatanodeAdminProperties>>() {
|
|
||||||
public boolean apply(java.util.Map.Entry<InetAddress,
|
|
||||||
DatanodeAdminProperties> entry) {
|
|
||||||
return entry.getValue().getAdminState().equals(
|
|
||||||
AdminStates.DECOMMISSIONED);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
));
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
}
|
||||||
|
|
||||||
synchronized long getMaintenanceExpireTimeInMS(
|
synchronized long getMaintenanceExpireTimeInMS(
|
||||||
final InetSocketAddress address) {
|
final InetSocketAddress address) {
|
||||||
Iterable<DatanodeAdminProperties> datanode = Iterables.filter(
|
Iterable<DatanodeAdminProperties> datanode =
|
||||||
allDNs.get(address.getAddress()),
|
allDNs.get(address.getAddress()).stream().filter(
|
||||||
new Predicate<DatanodeAdminProperties>() {
|
input -> input.getAdminState().equals(
|
||||||
public boolean apply(DatanodeAdminProperties input) {
|
|
||||||
return input.getAdminState().equals(
|
|
||||||
AdminStates.IN_MAINTENANCE) &&
|
AdminStates.IN_MAINTENANCE) &&
|
||||||
(input.getPort() == 0 ||
|
(input.getPort() == 0 ||
|
||||||
input.getPort() == address.getPort());
|
input.getPort() == address.getPort())).collect(
|
||||||
}
|
Collectors.toList());
|
||||||
});
|
|
||||||
// if DN isn't set to maintenance state, ignore MaintenanceExpireTimeInMS
|
// if DN isn't set to maintenance state, ignore MaintenanceExpireTimeInMS
|
||||||
// set in the config.
|
// set in the config.
|
||||||
return datanode.iterator().hasNext() ?
|
return datanode.iterator().hasNext() ?
|
||||||
|
|
|
@ -24,7 +24,7 @@ import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
@ -34,8 +34,6 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||||
import org.apache.hadoop.hdfs.server.common.Util;
|
import org.apache.hadoop.hdfs.server.common.Util;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.collect.Collections2;
|
|
||||||
import com.google.common.base.Predicate;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
|
@ -117,17 +115,14 @@ public class NameNodeResourceChecker {
|
||||||
Collection<URI> extraCheckedVolumes = Util.stringCollectionAsURIs(conf
|
Collection<URI> extraCheckedVolumes = Util.stringCollectionAsURIs(conf
|
||||||
.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
|
.getTrimmedStringCollection(DFSConfigKeys.DFS_NAMENODE_CHECKED_VOLUMES_KEY));
|
||||||
|
|
||||||
Collection<URI> localEditDirs = Collections2.filter(
|
Collection<URI> localEditDirs =
|
||||||
FSNamesystem.getNamespaceEditsDirs(conf),
|
FSNamesystem.getNamespaceEditsDirs(conf).stream().filter(
|
||||||
new Predicate<URI>() {
|
input -> {
|
||||||
@Override
|
if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
|
||||||
public boolean apply(URI input) {
|
return true;
|
||||||
if (input.getScheme().equals(NNStorage.LOCAL_URI_SCHEME)) {
|
}
|
||||||
return true;
|
return false;
|
||||||
}
|
}).collect(Collectors.toList());
|
||||||
return false;
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
// Add all the local edits dirs, marking some as required if they are
|
// Add all the local edits dirs, marking some as required if they are
|
||||||
// configured as such.
|
// configured as such.
|
||||||
|
|
|
@ -24,7 +24,7 @@ import java.text.SimpleDateFormat;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.hdfs.DFSUtil;
|
import org.apache.hadoop.hdfs.DFSUtil;
|
||||||
|
@ -38,9 +38,6 @@ import org.apache.hadoop.hdfs.server.namenode.INodeDirectory;
|
||||||
import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
|
import org.apache.hadoop.hdfs.server.namenode.XAttrFeature;
|
||||||
import org.apache.hadoop.hdfs.util.ReadOnlyList;
|
import org.apache.hadoop.hdfs.util.ReadOnlyList;
|
||||||
|
|
||||||
import com.google.common.base.Predicate;
|
|
||||||
import com.google.common.collect.Iterables;
|
|
||||||
import com.google.common.collect.Lists;
|
|
||||||
import org.apache.hadoop.security.AccessControlException;
|
import org.apache.hadoop.security.AccessControlException;
|
||||||
|
|
||||||
/** Snapshot of a sub-tree in the namesystem. */
|
/** Snapshot of a sub-tree in the namesystem. */
|
||||||
|
@ -149,20 +146,14 @@ public class Snapshot implements Comparable<byte[]> {
|
||||||
static public class Root extends INodeDirectory {
|
static public class Root extends INodeDirectory {
|
||||||
Root(INodeDirectory other) {
|
Root(INodeDirectory other) {
|
||||||
// Always preserve ACL, XAttr.
|
// Always preserve ACL, XAttr.
|
||||||
super(other, false, Lists.newArrayList(
|
super(other, false, Arrays.asList(other.getFeatures()).stream().filter(
|
||||||
Iterables.filter(Arrays.asList(other.getFeatures()), new Predicate<Feature>() {
|
input -> {
|
||||||
|
|
||||||
@Override
|
|
||||||
public boolean apply(Feature input) {
|
|
||||||
if (AclFeature.class.isInstance(input)
|
if (AclFeature.class.isInstance(input)
|
||||||
|| XAttrFeature.class.isInstance(input)) {
|
|| XAttrFeature.class.isInstance(input)) {
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
return false;
|
return false;
|
||||||
}
|
}).collect(Collectors.toList()).toArray(new Feature[0]));
|
||||||
|
|
||||||
}))
|
|
||||||
.toArray(new Feature[0]));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -74,7 +74,6 @@ import org.apache.hadoop.yarn.util.Times;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Predicate;
|
|
||||||
import com.google.common.collect.Iterables;
|
import com.google.common.collect.Iterables;
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
|
@ -355,14 +354,9 @@ public class AggregatedLogFormat {
|
||||||
: this.logAggregationContext.getRolledLogsExcludePattern(),
|
: this.logAggregationContext.getRolledLogsExcludePattern(),
|
||||||
candidates, true);
|
candidates, true);
|
||||||
|
|
||||||
Iterable<File> mask =
|
Iterable<File> mask = Iterables.filter(candidates, (input) ->
|
||||||
Iterables.filter(candidates, new Predicate<File>() {
|
!alreadyUploadedLogFiles
|
||||||
@Override
|
.contains(getLogFileMetaData(input)));
|
||||||
public boolean apply(File next) {
|
|
||||||
return !alreadyUploadedLogFiles
|
|
||||||
.contains(getLogFileMetaData(next));
|
|
||||||
}
|
|
||||||
});
|
|
||||||
return Sets.newHashSet(mask);
|
return Sets.newHashSet(mask);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -19,9 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.logaggregation.filecontroller;
|
package org.apache.hadoop.yarn.logaggregation.filecontroller;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Predicate;
|
|
||||||
import com.google.common.collect.Iterables;
|
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
import java.io.FileNotFoundException;
|
import java.io.FileNotFoundException;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.io.OutputStream;
|
import java.io.OutputStream;
|
||||||
|
@ -35,7 +33,7 @@ import java.util.LinkedList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
import org.apache.hadoop.classification.InterfaceAudience.Public;
|
||||||
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
import org.apache.hadoop.classification.InterfaceStability.Unstable;
|
||||||
|
@ -532,17 +530,12 @@ public abstract class LogAggregationFileController {
|
||||||
Set<FileStatus> status =
|
Set<FileStatus> status =
|
||||||
new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
|
new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
|
||||||
|
|
||||||
Iterable<FileStatus> mask =
|
status = status.stream().filter(
|
||||||
Iterables.filter(status, new Predicate<FileStatus>() {
|
next -> next.getPath().getName()
|
||||||
@Override
|
.contains(LogAggregationUtils.getNodeString(nodeId))
|
||||||
public boolean apply(FileStatus next) {
|
&& !next.getPath().getName().endsWith(
|
||||||
return next.getPath().getName()
|
LogAggregationUtils.TMP_FILE_SUFFIX)).collect(
|
||||||
.contains(LogAggregationUtils.getNodeString(nodeId))
|
Collectors.toSet());
|
||||||
&& !next.getPath().getName().endsWith(
|
|
||||||
LogAggregationUtils.TMP_FILE_SUFFIX);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
status = Sets.newHashSet(mask);
|
|
||||||
// Normally, we just need to delete one oldest log
|
// Normally, we just need to delete one oldest log
|
||||||
// before we upload a new log.
|
// before we upload a new log.
|
||||||
// If we can not delete the older logs in this cycle,
|
// If we can not delete the older logs in this cycle,
|
||||||
|
|
|
@ -19,9 +19,7 @@
|
||||||
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
|
package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Predicate;
|
|
||||||
import com.google.common.collect.Iterables;
|
|
||||||
import com.google.common.collect.Sets;
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.FileInputStream;
|
import java.io.FileInputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
@ -43,6 +41,7 @@ import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.apache.commons.lang3.SerializationUtils;
|
import org.apache.commons.lang3.SerializationUtils;
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
|
@ -706,16 +705,12 @@ public class LogAggregationIndexedFileController
|
||||||
public Map<String, Long> parseCheckSumFiles(
|
public Map<String, Long> parseCheckSumFiles(
|
||||||
List<FileStatus> fileList) throws IOException {
|
List<FileStatus> fileList) throws IOException {
|
||||||
Map<String, Long> checkSumFiles = new HashMap<>();
|
Map<String, Long> checkSumFiles = new HashMap<>();
|
||||||
Set<FileStatus> status = new HashSet<FileStatus>(fileList);
|
Set<FileStatus> status =
|
||||||
Iterable<FileStatus> mask =
|
new HashSet<>(fileList).stream().filter(
|
||||||
Iterables.filter(status, new Predicate<FileStatus>() {
|
next -> next.getPath().getName().endsWith(
|
||||||
@Override
|
CHECK_SUM_FILE_SUFFIX)).collect(
|
||||||
public boolean apply(FileStatus next) {
|
Collectors.toSet());
|
||||||
return next.getPath().getName().endsWith(
|
|
||||||
CHECK_SUM_FILE_SUFFIX);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
status = Sets.newHashSet(mask);
|
|
||||||
FileContext fc = null;
|
FileContext fc = null;
|
||||||
for (FileStatus file : status) {
|
for (FileStatus file : status) {
|
||||||
FSDataInputStream checksumFileInputStream = null;
|
FSDataInputStream checksumFileInputStream = null;
|
||||||
|
|
|
@ -28,6 +28,7 @@ import java.util.Set;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
@ -72,8 +73,6 @@ import org.apache.hadoop.yarn.util.Records;
|
||||||
import org.apache.hadoop.yarn.util.Times;
|
import org.apache.hadoop.yarn.util.Times;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Predicate;
|
|
||||||
import com.google.common.collect.Iterables;
|
|
||||||
import com.google.common.collect.Sets;
|
import com.google.common.collect.Sets;
|
||||||
|
|
||||||
|
|
||||||
|
@ -663,16 +662,9 @@ public class AppLogAggregatorImpl implements AppLogAggregator {
|
||||||
.getCurrentUpLoadedFileMeta());
|
.getCurrentUpLoadedFileMeta());
|
||||||
// if any of the previous uploaded logs have been deleted,
|
// if any of the previous uploaded logs have been deleted,
|
||||||
// we need to remove them from alreadyUploadedLogs
|
// we need to remove them from alreadyUploadedLogs
|
||||||
Iterable<String> mask =
|
this.uploadedFileMeta = uploadedFileMeta.stream().filter(
|
||||||
Iterables.filter(uploadedFileMeta, new Predicate<String>() {
|
next -> logValue.getAllExistingFilesMeta().contains(next)).collect(
|
||||||
@Override
|
Collectors.toSet());
|
||||||
public boolean apply(String next) {
|
|
||||||
return logValue.getAllExistingFilesMeta().contains(next);
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
this.uploadedFileMeta = Sets.newHashSet(mask);
|
|
||||||
|
|
||||||
// need to return files uploaded or older-than-retention clean up.
|
// need to return files uploaded or older-than-retention clean up.
|
||||||
return Sets.union(logValue.getCurrentUpLoadedFilesPath(),
|
return Sets.union(logValue.getCurrentUpLoadedFilesPath(),
|
||||||
logValue.getObsoleteRetentionLogFiles());
|
logValue.getObsoleteRetentionLogFiles());
|
||||||
|
|
Loading…
Reference in New Issue