Duplicate Files Finder Script

The following cross-platform Python script lets you find duplicate files within a directory tree. Files are considered duplicates when they have the same name and content. First, the script walks through the directory tree by using the os.walk() standard function and stores the MD5 digest of every file found in the process. The threading module is used to spawn multiple workers (as much as available cores) in order to speed up this process. Second, file names and MD5 digests are compared to find duplicate files and write the results into duplicate_files.txt.

The code is simple, type-annotated (checked against mypy) and much commented so it can be easily customized. Python 3.9 or greater is required.

import multiprocessing
import threading
import hashlib
import logging
import os
import sys
import time
from queue import Queue, Empty
from pathlib import Path
# Larger files are read by chunks.
CRITIC_SIZE = 100_000_000  # 100 MB.
# Larger files are ignored.
MAX_SIZE = 500_000_000  # 500 MB.
def get_file_id(file: Path, digest: str) -> str:
    Return the file ID for the specified file.
    A file ID is composed by the file name, a separator
    and the MD5 digest. Duplicate files will have the
    same file ID.
    return f"{file.name}|{digest}"
def safe_print(print_lock: threading.Lock, *args: object) -> None:
    Like `print()`, but thread-safe.
def get_file_md5(file: Path, size: int) -> str:
    Read the contents of the specified `file` and return
    its MD5 digest.
    h = hashlib.md5()
    split = size > CRITIC_SIZE
    with file.open("rb") as f:
        while True:
            # If necessary, read by chunks.
            chunk: bytes = f.read(CRITIC_SIZE if split else -1)
            if chunk:
    return h.hexdigest()
def worker(task_queue: Queue[Path],
        print_lock: threading.Lock,
        processed_files: list[tuple[Path, str]],
        abort: threading.Event) -> None:
    Process files from the `task_queue` until the queue
    is empty or the `abort` flag is set.
    while True:
            file = task_queue.get_nowait()
        except Empty:
            if abort.is_set():
        process_file(file, processed_files, print_lock)
def process_file(file: Path,
                processed_files: list[tuple[Path, str]],
                print_lock: threading.Lock) -> None:
    Calculate the MD5 digest for the specified `file` and append
    it to the `processed_files` list.
    size = file.stat().st_size
    # Ignore large files.
    if size > MAX_SIZE:
        safe_print(print_lock, file, "omitted (too big).")
        hexdigest = get_file_md5(file, size)
    except IOError as e:
        logging.error("Could not read {file}: {e}")
        processed_files.append((file, hexdigest))
start_time: float = time.perf_counter()
logging.basicConfig(filename="duplicate_search.log", level=logging.DEBUG)
    # Where to start searching.
    root = Path(sys.argv[1])
except IndexError:
    print("Missing path.")
if not root.exists():
    print("Directory", root, "does not exist.")
# List of processed files.
processed_files: list[tuple[Path, str]] = []
# A queue to communicate between workers.
task_queue: Queue[Path] = Queue()
# This lock ensures worker's prints do not overlap.
print_lock = threading.Lock()
cores = multiprocessing.cpu_count()
logging.info(f"{cores} CPU cores available.")
# List of workers.
threads: list[threading.Thread] = []
# When this event is fired workers will start returning.
abort = threading.Event()
# Spawn one worker per CPU core.
for _ in range(cores):
    t: threading.Thread = threading.Thread(
        args=(task_queue, print_lock, processed_files, abort)
print("Processing files...")
    # Populate the queue with every file under
    # the `root` directory (recursively).
    for path, dirnames, filenames in os.walk(root):
        for filename in filenames:
            task_queue.put(Path(path, filename))
except KeyboardInterrupt:
    logging.info("Manually stopped.")
# Tell the workers they need to return.
for t in threads:
print(len(processed_files), "processed files.")
print("Comparing processed files (might take a while)...")
processed_files_ids: list[str] = [
    get_file_id(file, digest)
    for file, digest in processed_files
# The key is the file ID, the value is a list of
# paths containing every file with that ID within
# the `root` directory
duplicate_files: dict[str, list[Path]] = {}
for (file, _digest), file_id in zip(processed_files, processed_files_ids):
    # Files with the same ID are duplicates.
    matches: int = processed_files_ids.count(file_id)
    if matches > 1:
        files: list[Path]
            # Use the existing list of matches.
            files = duplicate_files[file_id]
        except KeyError:
            # Create a new one if it's the first match.
            duplicate_files[file_id] = files = []
print("Writing results...")
with open("duplicate_files.txt", "w", encoding="utf8") as f:
    for files in duplicate_files.values():
        matches = len(files)
        # Since `files` have duplicate files, then every file
        # in the list has the same name. Just pick the first one.
        f.write(f"{files[0].name} found {matches} times:\n")
        # Write the full path of each duplicate file.
        for file in files:
print(len(duplicate_files), "duplicate files found.")
end_time: float = time.perf_counter()
print("Elapsed time:", end_time - start_time, "seconds.")

Download (source + mypy.ini file): duplicate-files-finder.zip.