Duplicate File Finder Script

/images/duplicate-file-finder-script/duplicate-file-finder-script.gif

The following cross-platform Python script lets you find duplicate files within a directory tree. Files are considered duplicates when they have the same name and content. First, the script walks through the directory tree by using the os.walk() standard function and stores the MD5 digest of every file found in the process. The threading module is used to spawn multiple workers (as much as available cores) in order to speed up this process. Second, file names and MD5 digests are compared to find duplicate files and write the results into duplicate_files.txt.

The code is simple, type-annotated (checked against mypy) and much commented so it can be easily customized. Python 3.9 or greater is required.

import multiprocessing
import threading
import hashlib
import logging
import os
import sys
import time
from queue import Queue, Empty
from pathlib import Path
# Larger files are read by chunks.
CRITIC_SIZE = 100_000_000  # 100 MB.
# Larger files are ignored.
MAX_SIZE = 500_000_000  # 500 MB.
def get_file_id(file: Path, digest: str) -> str:
    """
    Return the file ID for the specified file.
    A file ID is composed by the file name, a separator
    and the MD5 digest. Duplicate files will have the
    same file ID.
    """
    return f"{file.name}|{digest}"
def safe_print(print_lock: threading.Lock, *args: object) -> None:
    """
    Like `print()`, but thread-safe.
    """
    print_lock.acquire()
    print(*args)
    print_lock.release()
def get_file_md5(file: Path, size: int) -> str:
    """
    Read the contents of the specified `file` and return
    its MD5 digest.
    """
    h = hashlib.md5()
    split = size > CRITIC_SIZE
    with file.open("rb") as f:
        while True:
            # If necessary, read by chunks.
            chunk: bytes = f.read(CRITIC_SIZE if split else -1)
            if chunk:
                h.update(chunk)
            else:
                break
    return h.hexdigest()
def worker(task_queue: Queue[Path],
           print_lock: threading.Lock,
           processed_files: list[tuple[Path, str]],
           abort: threading.Event) -> None:
    """
    Process files from the `task_queue` until the queue
    is empty or the `abort` flag is set.
    """
    while True:
        try:
            file = task_queue.get_nowait()
        except Empty:
            if abort.is_set():
                break
            else:
                continue
        process_file(file, processed_files, print_lock)
        task_queue.task_done()
def process_file(file: Path,
                 processed_files: list[tuple[Path, str]],
                 print_lock: threading.Lock) -> None:
    """
    Calculate the MD5 digest for the specified `file` and append
    it to the `processed_files` list.
    """
    size = file.stat().st_size
    # Ignore large files.
    if size > MAX_SIZE:
        safe_print(print_lock, file, "omitted (too big).")
        return
    try:
        hexdigest = get_file_md5(file, size)
    except IOError as e:
        logging.error("Could not read {file}: {e}")
    else:
        processed_files.append((file, hexdigest))
start_time: float = time.perf_counter()
logging.basicConfig(filename="duplicate_search.log", level=logging.DEBUG)
try:
    # Where to start searching.
    root = Path(sys.argv[1])
except IndexError:
    print("Missing path.")
    sys.exit()
if not root.exists():
    print("Directory", root, "does not exist.")
    sys.exit()
# List of processed files.
processed_files: list[tuple[Path, str]] = []
# A queue to communicate between workers.
task_queue: Queue[Path] = Queue()
# This lock ensures worker's prints do not overlap.
print_lock = threading.Lock()
cores = multiprocessing.cpu_count()
logging.info(f"{cores} CPU cores available.")
# List of workers.
threads: list[threading.Thread] = []
# When this event is fired workers will start returning.
abort = threading.Event()
# Spawn one worker per CPU core.
for _ in range(cores):
    t: threading.Thread = threading.Thread(
        target=worker,
        args=(task_queue, print_lock, processed_files, abort)
    )
    t.start()
    threads.append(t)
print("Processing files...")
try:
    # Populate the queue with every file under
    # the `root` directory (recursively).
    for path, dirnames, filenames in os.walk(root):
        for filename in filenames:
            task_queue.put(Path(path, filename))
except KeyboardInterrupt:
    logging.info("Manually stopped.")
# Tell the workers they need to return.
abort.set()
for t in threads:
    t.join()
print(len(processed_files), "processed files.")
print("Comparing processed files (might take a while)...")
processed_files_ids: list[str] = [
    get_file_id(file, digest)
    for file, digest in processed_files
]
# The key is the file ID, the value is a list of
# paths containing every file with that ID within
# the `root` directory
duplicate_files: dict[str, list[Path]] = {}
for (file, _digest), file_id in zip(processed_files, processed_files_ids):
    # Files with the same ID are duplicates.
    matches: int = processed_files_ids.count(file_id)
    if matches > 1:
        files: list[Path]
        try:
            # Use the existing list of matches.
            files = duplicate_files[file_id]
        except KeyError:
            # Create a new one if it's the first match.
            duplicate_files[file_id] = files = []
        files.append(file)
print("Writing results...")
with open("duplicate_files.txt", "w", encoding="utf8") as f:
    for files in duplicate_files.values():
        matches = len(files)
        # Since `files` have duplicate files, then every file
        # in the list has the same name. Just pick the first one.
        f.write(f"{files[0].name} found {matches} times:\n")
        # Write the full path of each duplicate file.
        for file in files:
            f.write(f"\t{file}\n")
print(len(duplicate_files), "duplicate files found.")
end_time: float = time.perf_counter()
print("Elapsed time:", end_time - start_time, "seconds.")

A root directory must be passed to the script to start searching for duplicate files. The search is recursive. For example, to find duplicate files inside C:\Python310\Lib and its subfolders, run in the terminal:

py find_duplicates_files.py C:\Python310\Lib

On macOS and Linux distributions, use python or python3 instead of py.

Download (source + mypy.ini file): duplicate-file-finder.zip.