mirror of https://github.com/apache/nifi.git
NIFI-6023: Deprecated the "Distributed Cache Service" property in ListHDFS
Updated exception message thrown when accessing the ListHDFS state to refer to the State Manager rather than the "Distributed Cache Service" Removed usage and references to MockCacheClient from TestListHDFS NIFI-6023: Minor update of the "Distributed Cache Service" description This closes #3313. Signed-off-by: Bryan Bende <bbende@apache.org>
This commit is contained in:
parent
319979f256
commit
b7ca59f568
|
@ -96,10 +96,10 @@ import java.util.regex.Pattern;
|
||||||
@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
|
@SeeAlso({GetHDFS.class, FetchHDFS.class, PutHDFS.class})
|
||||||
public class ListHDFS extends AbstractHadoopProcessor {
|
public class ListHDFS extends AbstractHadoopProcessor {
|
||||||
|
|
||||||
|
@Deprecated
|
||||||
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
|
public static final PropertyDescriptor DISTRIBUTED_CACHE_SERVICE = new PropertyDescriptor.Builder()
|
||||||
.name("Distributed Cache Service")
|
.name("Distributed Cache Service")
|
||||||
.description("Specifies the Controller Service that should be used to maintain state about what has been pulled from HDFS so that if a new node "
|
.description("This property is ignored. State will be stored in the " + Scope.LOCAL + " or " + Scope.CLUSTER + " scope by the State Manager based on NiFi's configuration.")
|
||||||
+ "begins pulling data, it won't duplicate all of the work that has been done.")
|
|
||||||
.required(false)
|
.required(false)
|
||||||
.identifiesControllerService(DistributedMapCacheClient.class)
|
.identifiesControllerService(DistributedMapCacheClient.class)
|
||||||
.build();
|
.build();
|
||||||
|
@ -363,7 +363,7 @@ public class ListHDFS extends AbstractHadoopProcessor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (final IOException ioe) {
|
} catch (final IOException ioe) {
|
||||||
getLogger().error("Failed to retrieve timestamp of last listing from Distributed Cache Service. Will not perform listing until this is accomplished.");
|
getLogger().error("Failed to retrieve timestamp of last listing from the State Manager. Will not perform listing until this is accomplished.");
|
||||||
context.yield();
|
context.yield();
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
|
@ -66,7 +66,6 @@ public class TestListHDFS {
|
||||||
|
|
||||||
private TestRunner runner;
|
private TestRunner runner;
|
||||||
private ListHDFSWithMockedFileSystem proc;
|
private ListHDFSWithMockedFileSystem proc;
|
||||||
private MockCacheClient service;
|
|
||||||
private NiFiProperties mockNiFiProperties;
|
private NiFiProperties mockNiFiProperties;
|
||||||
private KerberosProperties kerberosProperties;
|
private KerberosProperties kerberosProperties;
|
||||||
|
|
||||||
|
@ -79,13 +78,8 @@ public class TestListHDFS {
|
||||||
proc = new ListHDFSWithMockedFileSystem(kerberosProperties);
|
proc = new ListHDFSWithMockedFileSystem(kerberosProperties);
|
||||||
runner = TestRunners.newTestRunner(proc);
|
runner = TestRunners.newTestRunner(proc);
|
||||||
|
|
||||||
service = new MockCacheClient();
|
|
||||||
runner.addControllerService("service", service);
|
|
||||||
runner.enableControllerService(service);
|
|
||||||
|
|
||||||
runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml");
|
runner.setProperty(ListHDFS.HADOOP_CONFIGURATION_RESOURCES, "src/test/resources/core-site.xml");
|
||||||
runner.setProperty(ListHDFS.DIRECTORY, "/test");
|
runner.setProperty(ListHDFS.DIRECTORY, "/test");
|
||||||
runner.setProperty(ListHDFS.DISTRIBUTED_CACHE_SERVICE, "service");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -374,9 +368,6 @@ public class TestListHDFS {
|
||||||
// add new file to pull
|
// add new file to pull
|
||||||
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 2000L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
|
proc.fileSystem.addFileStatus(new Path("/test"), new FileStatus(1L, false, 1, 1L, 2000L, 0L, create777(), "owner", "group", new Path("/test/testFile2.txt")));
|
||||||
|
|
||||||
// cause calls to service to fail
|
|
||||||
service.failOnCalls = true;
|
|
||||||
|
|
||||||
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
|
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, true);
|
||||||
|
|
||||||
// Should fail to perform @OnScheduled methods.
|
// Should fail to perform @OnScheduled methods.
|
||||||
|
@ -397,7 +388,6 @@ public class TestListHDFS {
|
||||||
|
|
||||||
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
|
runner.assertAllFlowFilesTransferred(ListHDFS.REL_SUCCESS, 0);
|
||||||
|
|
||||||
service.failOnCalls = false;
|
|
||||||
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, false);
|
runner.getStateManager().setFailOnStateGet(Scope.CLUSTER, false);
|
||||||
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
|
Thread.sleep(TimeUnit.NANOSECONDS.toMillis(2 * ListHDFS.LISTING_LAG_NANOS));
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue