# Released under the MIT License. See LICENSE for details.#"""Thread pool functionality."""from__future__importannotationsimporttimeimportloggingimportthreadingfromtypingimportTYPE_CHECKING,ParamSpecfromconcurrent.futuresimportThreadPoolExecutorifTYPE_CHECKING:fromtypingimportAny,Callablefromconcurrent.futuresimportFutureP=ParamSpec('P')logger=logging.getLogger(__name__)
[docs]classThreadPoolExecutorPlus(ThreadPoolExecutor):"""A ThreadPoolExecutor with additional functionality added."""def__init__(self,max_workers:int|None=None,thread_name_prefix:str='',initializer:Callable[[],None]|None=None,max_no_wait_count:int|None=None,)->None:super().__init__(max_workers=max_workers,thread_name_prefix=thread_name_prefix,initializer=initializer,)self.no_wait_count=0self._max_no_wait_count=(max_no_wait_countifmax_no_wait_countisnotNoneelse50ifmax_workersisNoneelsemax_workers*2)self._last_no_wait_warn_time:float|None=Noneself._no_wait_count_lock=threading.Lock()
[docs]defsubmit_no_wait(self,call:Callable[P,Any],*args:P.args,**keywds:P.kwargs)->None:"""Submit work to the threadpool with no expectation of waiting. Any errors occurring in the passed callable will be logged. This call will block and log a warning if the threadpool reaches its max queued no-wait call count. """# If we're too backlogged, issue a warning and block until we# aren't. We don't bother with the lock here since this can be# slightly inexact. In general we should aim to not hit this# limit but it is good to have backpressure to avoid runaway# queues in cases of network outages/etc.ifself.no_wait_count>self._max_no_wait_count:now=time.monotonic()if(self._last_no_wait_warn_timeisNoneornow-self._last_no_wait_warn_time>10.0):logger.warning('ThreadPoolExecutorPlus hit max no-wait limit of %s;'' blocking.',self._max_no_wait_count,)self._last_no_wait_warn_time=nowwhileself.no_wait_count>self._max_no_wait_count:time.sleep(0.01)fut=self.submit(call,*args,**keywds)withself._no_wait_count_lock:self.no_wait_count+=1fut.add_done_callback(self._no_wait_done)
def_no_wait_done(self,fut:Future)->None:withself._no_wait_count_lock:self.no_wait_count-=1try:fut.result()exceptException:logger.exception('Error in work submitted via submit_no_wait().')