# Released under the MIT License. See LICENSE for details.#"""Operate on large sets of files efficiently."""from__future__importannotationsimportloggingfromcollectionsimportdequefromtypingimportTYPE_CHECKINGfromthreadingimportCondition,ThreadimportosifTYPE_CHECKING:fromtypingimportIterable,Callableclass_FileBatchesRun:def__init__(self,paths:list[str],batch_size:int,file_filter:Callable[[str],bool]|None,include_mac_packages:bool=False,)->None:self.condition=Condition()self.paths=pathsself.batches=deque[list[str]]()self.batch_size=batch_sizeself.done=Falseself.errored=Falseself.file_filter=file_filterself.batch_buffer_size=5self._pending_batch:list[str]=[]self._include_mac_packages=include_mac_packagesifself._include_mac_packages:# pylint: disable=useless-suppression# pylint: disable=no-name-in-module, import-error# noinspection PyUnresolvedReferencesfromCocoaimportNSWorkspace# pyright: ignoreself._shared_nsworkspace=NSWorkspace.sharedWorkspace()# pylint: enable=useless-suppressionelse:self._shared_nsworkspace=Nonedef_submit_pending_batch(self)->None:assertself._pending_batch# Wait until there's room on the list (or we've been marked done),# stuff our new results in, and inform any listeners that it has# changed.withself.condition:self.condition.wait_for(lambda:len(self.batches)<self.batch_buffer_sizeorself.done)self.batches.append(self._pending_batch)self._pending_batch=[]self.condition.notify()def_possibly_add_to_pending_batch(self,path:str)->None:try:ifself.file_filterisNoneorself.file_filter(path):self._pending_batch.append(path)iflen(self._pending_batch)>=self.batch_size:self._submit_pending_batch()exceptException:# FIXME: we should translate this into failing overall...logging.exception('Error in file_filter')defbg_thread(self)->None:"""Add batches in the bg thread."""# pylint: disable=too-many-nested-blocks# Build batches and push them when they're big enough.forpathinself.paths:ifos.path.isfile(path):self._possibly_add_to_pending_batch(path)elifos.path.isdir(path):# From os.walk docs: we can prune dirs in-place when# running in top-down mode. We can use this to skip# diving into mac packages.forroot,dirs,fnamesinos.walk(path,topdown=True):# If we find dirs that are actually mac packages, pull# them out of the dir list we'll dive into and pass# them directly to our batch for processing.ifself._include_mac_packages:assertself._shared_nsworkspaceisnotNonefordirnameinlist(dirs):fullpath=os.path.join(root,dirname)ifself._shared_nsworkspace.isFilePackageAtPath_(fullpath):dirs.remove(dirname)self._possibly_add_to_pending_batch(fullpath)forfnameinfnames:fullpath=os.path.join(root,fname)self._possibly_add_to_pending_batch(fullpath)ifself._pending_batch:self._submit_pending_batch()# Tell the world we're done.withself.condition:self.done=Trueself.condition.notify()
[docs]deffile_batches(paths:list[str],batch_size:int=1,file_filter:Callable[[str],bool]|None=None,include_mac_packages:bool=False,)->Iterable[list[str]]:"""Efficiently yield batches of files to operate on. Accepts a list of paths which can be files or directories to be recursed. The batch lists are buffered in a background thread so time-consuming synchronous operations on the returned batches will not slow the gather. """run=_FileBatchesRun(paths=paths,batch_size=batch_size,file_filter=file_filter,include_mac_packages=include_mac_packages,)# Spin up a bg thread to feed us batches.thread=Thread(target=run.bg_thread)thread.start()# Now spin waiting for new batches to come in or completion/errors.whileTrue:withrun.condition:run.condition.wait_for(lambda:run.doneorrun.erroredorrun.batches)try:ifrun.errored:raiseRuntimeError('BG batch run errored.')whilerun.batches:yieldrun.batches.popleft()ifrun.done:breakexceptGeneratorExit:# Lets the bg thread know to abort.run.done=Trueraisefinally:run.condition.notify()