Source code for efrotools.filecommand

# Released under the MIT License. See LICENSE for details.
#
"""Operate on large sets of files efficiently."""

from __future__ import annotations

import logging
from collections import deque
from typing import TYPE_CHECKING
from threading import Condition, Thread
import os

if TYPE_CHECKING:
    from typing import Iterable, Callable


class _FileBatchesRun:
    def __init__(
        self,
        paths: list[str],
        batch_size: int,
        file_filter: Callable[[str], bool] | None,
        include_mac_packages: bool = False,
    ) -> None:
        self.condition = Condition()
        self.paths = paths
        self.batches = deque[list[str]]()
        self.batch_size = batch_size
        self.done = False
        self.errored = False
        self.file_filter = file_filter
        self.batch_buffer_size = 5
        self._pending_batch: list[str] = []
        self._include_mac_packages = include_mac_packages

        if self._include_mac_packages:
            # pylint: disable=useless-suppression
            # pylint: disable=no-name-in-module, import-error
            # noinspection PyUnresolvedReferences
            from Cocoa import NSWorkspace  # pyright: ignore

            self._shared_nsworkspace = NSWorkspace.sharedWorkspace()
            # pylint: enable=useless-suppression
        else:
            self._shared_nsworkspace = None

    def _submit_pending_batch(self) -> None:
        assert self._pending_batch

        # Wait until there's room on the list (or we've been marked done),
        # stuff our new results in, and inform any listeners that it has
        # changed.
        with self.condition:
            self.condition.wait_for(
                lambda: len(self.batches) < self.batch_buffer_size or self.done
            )
            self.batches.append(self._pending_batch)
            self._pending_batch = []
            self.condition.notify()

    def _possibly_add_to_pending_batch(self, path: str) -> None:
        try:
            if self.file_filter is None or self.file_filter(path):
                self._pending_batch.append(path)
                if len(self._pending_batch) >= self.batch_size:
                    self._submit_pending_batch()
        except Exception:
            # FIXME: we should translate this into failing overall...
            logging.exception('Error in file_filter')

    def bg_thread(self) -> None:
        """Add batches in the bg thread."""
        # pylint: disable=too-many-nested-blocks

        # Build batches and push them when they're big enough.
        for path in self.paths:
            if os.path.isfile(path):
                self._possibly_add_to_pending_batch(path)
            elif os.path.isdir(path):
                # From os.walk docs: we can prune dirs in-place when
                # running in top-down mode. We can use this to skip
                # diving into mac packages.
                for root, dirs, fnames in os.walk(path, topdown=True):
                    # If we find dirs that are actually mac packages, pull
                    # them out of the dir list we'll dive into and pass
                    # them directly to our batch for processing.
                    if self._include_mac_packages:
                        assert self._shared_nsworkspace is not None
                        for dirname in list(dirs):
                            fullpath = os.path.join(root, dirname)
                            if self._shared_nsworkspace.isFilePackageAtPath_(
                                fullpath
                            ):
                                dirs.remove(dirname)
                                self._possibly_add_to_pending_batch(fullpath)

                    for fname in fnames:
                        fullpath = os.path.join(root, fname)
                        self._possibly_add_to_pending_batch(fullpath)

        if self._pending_batch:
            self._submit_pending_batch()

        # Tell the world we're done.
        with self.condition:
            self.done = True
            self.condition.notify()


[docs] def file_batches( paths: list[str], batch_size: int = 1, file_filter: Callable[[str], bool] | None = None, include_mac_packages: bool = False, ) -> Iterable[list[str]]: """Efficiently yield batches of files to operate on. Accepts a list of paths which can be files or directories to be recursed. The batch lists are buffered in a background thread so time-consuming synchronous operations on the returned batches will not slow the gather. """ run = _FileBatchesRun( paths=paths, batch_size=batch_size, file_filter=file_filter, include_mac_packages=include_mac_packages, ) # Spin up a bg thread to feed us batches. thread = Thread(target=run.bg_thread) thread.start() # Now spin waiting for new batches to come in or completion/errors. while True: with run.condition: run.condition.wait_for( lambda: run.done or run.errored or run.batches ) try: if run.errored: raise RuntimeError('BG batch run errored.') while run.batches: yield run.batches.popleft() if run.done: break except GeneratorExit: # Lets the bg thread know to abort. run.done = True raise finally: run.condition.notify()
# Docs-generation hack; import some stuff that we likely only forward-declared # in our actual source code so that docs tools can find it. from typing import (Coroutine, Any, Literal, Callable, Generator, Awaitable, Sequence, Self) import asyncio from concurrent.futures import Future from pathlib import Path from enum import Enum