Source code for efrotools.filecommand
# Released under the MIT License. See LICENSE for details.
#
"""Operate on large sets of files efficiently."""
from __future__ import annotations
import logging
from collections import deque
from typing import TYPE_CHECKING
from threading import Condition, Thread
import os
if TYPE_CHECKING:
from typing import Iterable, Callable
class _FileBatchesRun:
def __init__(
self,
paths: list[str],
batch_size: int,
file_filter: Callable[[str], bool] | None,
include_mac_packages: bool = False,
) -> None:
self.condition = Condition()
self.paths = paths
self.batches = deque[list[str]]()
self.batch_size = batch_size
self.done = False
self.errored = False
self.file_filter = file_filter
self.batch_buffer_size = 5
self._pending_batch: list[str] = []
self._include_mac_packages = include_mac_packages
if self._include_mac_packages:
# pylint: disable=useless-suppression
# pylint: disable=no-name-in-module, import-error
from Cocoa import NSWorkspace # pyright: ignore
self._shared_nsworkspace = NSWorkspace.sharedWorkspace()
# pylint: enable=useless-suppression
else:
self._shared_nsworkspace = None
def _submit_pending_batch(self) -> None:
assert self._pending_batch
# Wait until there's room on the list (or we've been marked done),
# stuff our new results in, and inform any listeners that it has
# changed.
with self.condition:
self.condition.wait_for(
lambda: len(self.batches) < self.batch_buffer_size or self.done
)
self.batches.append(self._pending_batch)
self._pending_batch = []
self.condition.notify()
def _possibly_add_to_pending_batch(self, path: str) -> None:
try:
if self.file_filter is None or self.file_filter(path):
self._pending_batch.append(path)
if len(self._pending_batch) >= self.batch_size:
self._submit_pending_batch()
except Exception:
# FIXME: we should translate this into failing overall...
logging.exception('Error in file_filter')
def bg_thread(self) -> None:
"""Add batches in the bg thread."""
# pylint: disable=too-many-nested-blocks
# Build batches and push them when they're big enough.
for path in self.paths:
if os.path.isfile(path):
self._possibly_add_to_pending_batch(path)
elif os.path.isdir(path):
# From os.walk docs: we can prune dirs in-place when
# running in top-down mode. We can use this to skip
# diving into mac packages.
for root, dirs, fnames in os.walk(path, topdown=True):
# If we find dirs that are actually mac packages, pull
# them out of the dir list we'll dive into and pass
# them directly to our batch for processing.
if self._include_mac_packages:
assert self._shared_nsworkspace is not None
for dirname in list(dirs):
fullpath = os.path.join(root, dirname)
if self._shared_nsworkspace.isFilePackageAtPath_(
fullpath
):
dirs.remove(dirname)
self._possibly_add_to_pending_batch(fullpath)
for fname in fnames:
fullpath = os.path.join(root, fname)
self._possibly_add_to_pending_batch(fullpath)
if self._pending_batch:
self._submit_pending_batch()
# Tell the world we're done.
with self.condition:
self.done = True
self.condition.notify()
[docs]
def file_batches(
paths: list[str],
batch_size: int = 1,
file_filter: Callable[[str], bool] | None = None,
include_mac_packages: bool = False,
) -> Iterable[list[str]]:
"""Efficiently yield batches of files to operate on.
Accepts a list of paths which can be files or directories to be recursed.
The batch lists are buffered in a background thread so time-consuming
synchronous operations on the returned batches will not slow the gather.
"""
run = _FileBatchesRun(
paths=paths,
batch_size=batch_size,
file_filter=file_filter,
include_mac_packages=include_mac_packages,
)
# Spin up a bg thread to feed us batches.
thread = Thread(target=run.bg_thread)
thread.start()
# Now spin waiting for new batches to come in or completion/errors.
while True:
with run.condition:
run.condition.wait_for(
lambda: run.done or run.errored or run.batches
)
try:
if run.errored:
raise RuntimeError('BG batch run errored.')
while run.batches:
yield run.batches.popleft()
if run.done:
break
except GeneratorExit:
# Lets the bg thread know to abort.
run.done = True
raise
finally:
run.condition.notify()
# Docs-generation hack; import some stuff that we likely only forward-declared
# in our actual source code so that docs tools can find it.
from typing import (Coroutine, Any, Literal, Callable,
Generator, Awaitable, Sequence, Self)
import asyncio
from concurrent.futures import Future
from pathlib import Path
from enum import Enum