heroweb/lib/tools/deduper.py
2024-08-22 12:09:10 +02:00

79 lines
3.0 KiB
Python

import hashlib
from typing import Dict
import os
from pathlib import Path
class Deduper:
"""
tools to start from an existing directory to make sure we don't have duplicates in template
"""
def __init__(self, path: str):
self.fileobj_path = Path(path).expanduser().resolve()
self.fileobj_path.mkdir(parents=True, exist_ok=True)
self.hash_dict : Dict[str,str] = {} #key is hash, file is name of file in dedupe
# load all the existing files
for root, _, files in os.walk(self.fileobj_path):
for file in files:
file_path = os.path.join(root, file)
relative_path = os.path.relpath(file_path, self.fileobj_path)
print(f" - load deduped file {file_path}")
if not file.startswith('.'):
file_hash = self._calculate_md5(file_path)
if file_hash in self.hash_dict:
raise Exception(f"duplicate in dedupe pool: {file_path}")
self.hash_dict[file_hash] = relative_path
def _calculate_md5(self, file_path: str) -> str:
hash_md5 = hashlib.md5()
with open(file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
# def find_or_add_file(self, file_path: str) -> str:
# file_hash = self._calculate_md5(file_path)
# if file_hash in self.hash_dict:
# print(f" - add file to dedupe: {file_path} existed")
# return self.hash_dict[file_hash]
# self.hash_dict[file_hash] =
# print(f" - add file to dedupe: {file_path} new")
# return file_path
#check filename exists in pool (case insensitive) if yes return the filelocation in the dedupe pool
def check_from_name(self, file_name: str) -> str:
for hash_val, stored_file_name in self.hash_dict.items():
if stored_file_name.lower() == file_name.lower():
return stored_file_name
return ""
#will check if file exists in dedupe pool
def check_from_path(self, file_path: str) -> str:
# Check if the file path exists
if not Path(file_path).exists():
raise FileNotFoundError(f"File '{file_path}' does not exist.")
# Calculate the hash of the file
file_hash = self._calculate_md5(file_path)
# Check if the file exists in self.hash_dict
if file_hash in self.hash_dict:
stored_file_name = self.hash_dict[file_hash]
return stored_file_name
# Return the path relative to self.fileobj_path
# return str(Path(stored_file_name).relative_to(self.fileobj_path))
# If the file doesn't exist in self.hash_dict, return an empty string
return ""
#return full path based on filename
def full_path(self, file_name: str) -> str:
return f"{self.fileobj_path}/{file_name}"