NIFI-212: When comparing .tar files for differences, rather than comparing MD5SUM's of .tar files, ensure that .tar files have the same number of entries and that those entries are identical. This prevents issues that arise if the TAR metadata within the file differs

This commit is contained in:
Mark Payne 2014-12-30 10:00:24 -05:00
parent fed987e778
commit 05cc6f045d
1 changed files with 56 additions and 7 deletions

View File

@ -24,6 +24,9 @@ import java.io.FilenameFilter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import java.util.UUID;
import javax.xml.bind.JAXBContext;
@ -40,6 +43,10 @@ import javax.xml.transform.TransformerFactory;
import javax.xml.transform.dom.DOMSource;
import javax.xml.transform.stream.StreamResult;
import org.apache.commons.compress.archivers.ArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.nifi.cluster.flow.ClusterDataFlow;
import org.apache.nifi.cluster.flow.DaoException;
import org.apache.nifi.cluster.flow.DataFlowDao;
@ -48,16 +55,12 @@ import org.apache.nifi.cluster.protocol.DataFlow;
import org.apache.nifi.cluster.protocol.NodeIdentifier;
import org.apache.nifi.cluster.protocol.StandardDataFlow;
import org.apache.nifi.cluster.protocol.jaxb.message.NodeIdentifierAdapter;
import org.apache.nifi.util.file.FileUtils;
import org.apache.nifi.logging.NiFiLog;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.stream.io.BufferedOutputStream;
import org.apache.nifi.stream.io.ByteArrayInputStream;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.logging.NiFiLog;
import org.apache.commons.compress.archivers.tar.TarArchiveEntry;
import org.apache.commons.compress.archivers.tar.TarArchiveInputStream;
import org.apache.commons.compress.archivers.tar.TarArchiveOutputStream;
import org.apache.nifi.util.file.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
@ -174,14 +177,60 @@ public class DataFlowDaoImpl implements DataFlowDao {
FileUtils.copyFile(restoreFlowStateFile, primaryFlowStateFile, false, false, logger);
} else {
// sync the primary copy with the restore copy
FileUtils.syncWithRestore(primaryFlowStateFile, restoreFlowStateFile, logger);
syncWithRestore(primaryFlowStateFile, restoreFlowStateFile);
}
}
} catch (final IOException | IllegalArgumentException | IllegalStateException | JAXBException ex) {
throw new DaoException(ex);
}
}
private void syncWithRestore(final File primaryFile, final File restoreFile) throws IOException {
try (final FileInputStream primaryFis = new FileInputStream(primaryFile);
final TarArchiveInputStream primaryIn = new TarArchiveInputStream(primaryFis);
final FileInputStream restoreFis = new FileInputStream(restoreFile);
final TarArchiveInputStream restoreIn = new TarArchiveInputStream(restoreFis)) {
final ArchiveEntry primaryEntry = primaryIn.getNextEntry();
final ArchiveEntry restoreEntry = restoreIn.getNextEntry();
if ( primaryEntry == null && restoreEntry == null ) {
return;
}
if ( (primaryEntry == null && restoreEntry != null) || (primaryEntry != null && restoreEntry == null) ) {
throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'",
primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath()));
}
final byte[] primaryMd5 = calculateMd5(primaryIn);
final byte[] restoreMd5 = calculateMd5(restoreIn);
if ( !Arrays.equals(primaryMd5, restoreMd5) ) {
throw new IllegalStateException(String.format("Primary file '%s' is different than restore file '%s'",
primaryFile.getAbsoluteFile(), restoreFile.getAbsolutePath()));
}
}
}
private byte[] calculateMd5(final InputStream in) throws IOException {
final MessageDigest digest;
try {
digest = MessageDigest.getInstance("MD5");
} catch (final NoSuchAlgorithmException nsae) {
throw new IOException(nsae);
}
int len;
final byte[] buffer = new byte[8192];
while ((len = in.read(buffer)) > -1) {
if (len > 0) {
digest.update(buffer, 0, len);
}
}
return digest.digest();
}
@Override