mirror of
https://github.com/apache/nifi.git
synced 2025-02-17 23:47:08 +00:00
NIFI-1483 Correcting logic in terms of when local persistence files are removed during the migration process.
Reviewed by Tony Kurc (tkurc@apache.org). This closes #206
This commit is contained in:
parent
ea5818c398
commit
2673370cba
@ -218,12 +218,6 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// delete the local file, since it is no longer needed
|
|
||||||
final File localFile = new File(path);
|
|
||||||
if (localFile.exists() && !localFile.delete()) {
|
|
||||||
getLogger().warn("Migrated state but failed to delete local persistence file");
|
|
||||||
}
|
|
||||||
|
|
||||||
// remove entry from Distributed cache server
|
// remove entry from Distributed cache server
|
||||||
if (client != null) {
|
if (client != null) {
|
||||||
try {
|
try {
|
||||||
@ -285,6 +279,11 @@ public abstract class AbstractListProcessor<T extends ListableEntity> extends Ab
|
|||||||
latestIdentifiersListed.addAll(listing.getMatchingIdentifiers());
|
latestIdentifiersListed.addAll(listing.getMatchingIdentifiers());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// delete the local file, since it is no longer needed
|
||||||
|
if (persistenceFile.exists() && !persistenceFile.delete()) {
|
||||||
|
getLogger().warn("Migrated state but failed to delete local persistence file");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (minTimestamp != null) {
|
if (minTimestamp != null) {
|
||||||
|
@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
|
|||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
|
import java.io.FileOutputStream;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -29,6 +30,7 @@ import java.util.List;
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
|
import org.apache.commons.io.Charsets;
|
||||||
import org.apache.nifi.components.PropertyDescriptor;
|
import org.apache.nifi.components.PropertyDescriptor;
|
||||||
import org.apache.nifi.components.state.Scope;
|
import org.apache.nifi.components.state.Scope;
|
||||||
import org.apache.nifi.components.state.StateMap;
|
import org.apache.nifi.components.state.StateMap;
|
||||||
@ -42,10 +44,16 @@ import org.apache.nifi.reporting.InitializationException;
|
|||||||
import org.apache.nifi.state.MockStateManager;
|
import org.apache.nifi.state.MockStateManager;
|
||||||
import org.apache.nifi.util.TestRunner;
|
import org.apache.nifi.util.TestRunner;
|
||||||
import org.apache.nifi.util.TestRunners;
|
import org.apache.nifi.util.TestRunners;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
|
|
||||||
public class TestAbstractListProcessor {
|
public class TestAbstractListProcessor {
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public final TemporaryFolder testFolder = new TemporaryFolder();
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testOnlyNewEntriesEmitted() {
|
public void testOnlyNewEntriesEmitted() {
|
||||||
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
||||||
@ -121,7 +129,7 @@ public class TestAbstractListProcessor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testStateMigrated() throws InitializationException {
|
public void testStateMigratedFromCacheService() throws InitializationException {
|
||||||
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
||||||
final TestRunner runner = TestRunners.newTestRunner(proc);
|
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||||
final DistributedCache cache = new DistributedCache();
|
final DistributedCache cache = new DistributedCache();
|
||||||
@ -142,6 +150,50 @@ public class TestAbstractListProcessor {
|
|||||||
stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
|
stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testNoStateToMigrate() throws Exception {
|
||||||
|
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
final MockStateManager stateManager = runner.getStateManager();
|
||||||
|
final Map<String, String> expectedState = new HashMap<>();
|
||||||
|
stateManager.assertStateEquals(expectedState, Scope.CLUSTER);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testStateMigratedFromLocalFile() throws Exception {
|
||||||
|
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
||||||
|
final TestRunner runner = TestRunners.newTestRunner(proc);
|
||||||
|
|
||||||
|
// Create a file that we will populate with the desired state
|
||||||
|
File persistenceFile = testFolder.newFile(proc.persistenceFilename);
|
||||||
|
// Override the processor's internal persistence file
|
||||||
|
proc.persistenceFile = persistenceFile;
|
||||||
|
|
||||||
|
// Local File persistence was a properties file format of <key>=<JSON entity listing representation>
|
||||||
|
// Our ConcreteListProcessor is centered around files which are provided for a given path
|
||||||
|
final String serviceState = proc.getPath(runner.getProcessContext()) + "={\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}";
|
||||||
|
|
||||||
|
// Create a persistence file of the format anticipated
|
||||||
|
try (FileOutputStream fos = new FileOutputStream(persistenceFile);) {
|
||||||
|
fos.write(serviceState.getBytes(Charsets.UTF_8));
|
||||||
|
}
|
||||||
|
|
||||||
|
runner.run();
|
||||||
|
|
||||||
|
// Verify the local persistence file is removed
|
||||||
|
Assert.assertTrue("Failed to remove persistence file", !persistenceFile.exists());
|
||||||
|
|
||||||
|
// Verify the state manager now maintains the associated state
|
||||||
|
final Map<String, String> expectedState = new HashMap<>();
|
||||||
|
expectedState.put(AbstractListProcessor.TIMESTAMP, "1492");
|
||||||
|
expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", "id");
|
||||||
|
|
||||||
|
runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER);
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testFetchOnStart() throws InitializationException {
|
public void testFetchOnStart() throws InitializationException {
|
||||||
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
final ConcreteListProcessor proc = new ConcreteListProcessor();
|
||||||
@ -239,9 +291,13 @@ public class TestAbstractListProcessor {
|
|||||||
private static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> {
|
private static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> {
|
||||||
private final List<ListableEntity> entities = new ArrayList<>();
|
private final List<ListableEntity> entities = new ArrayList<>();
|
||||||
|
|
||||||
|
public final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json";
|
||||||
|
public String persistenceFolder = "target/";
|
||||||
|
public File persistenceFile = new File(persistenceFolder + persistenceFilename);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected File getPersistenceFile() {
|
protected File getPersistenceFile() {
|
||||||
return new File("target/ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json");
|
return persistenceFile;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void addEntity(final String name, final String identifier, final long timestamp) {
|
public void addEntity(final String name, final String identifier, final long timestamp) {
|
||||||
|
Loading…
x
Reference in New Issue
Block a user