Source code for efro.threadpool
# Released under the MIT License. See LICENSE for details.
#
"""Thread pool functionality."""
from __future__ import annotations
import time
import logging
import threading
from typing import TYPE_CHECKING, ParamSpec
from concurrent.futures import ThreadPoolExecutor
if TYPE_CHECKING:
from typing import Any, Callable
from concurrent.futures import Future
P = ParamSpec('P')
logger = logging.getLogger(__name__)
[docs]
class ThreadPoolExecutorEx(ThreadPoolExecutor):
"""A ThreadPoolExecutor with additional functionality added."""
def __init__(
self,
max_workers: int | None = None,
thread_name_prefix: str = '',
initializer: Callable[[], None] | None = None,
max_no_wait_count: int | None = None,
) -> None:
super().__init__(
max_workers=max_workers,
thread_name_prefix=thread_name_prefix,
initializer=initializer,
)
self.no_wait_count = 0
self._max_no_wait_count = (
max_no_wait_count
if max_no_wait_count is not None
else 50 if max_workers is None else max_workers * 2
)
self._last_no_wait_warn_time: float | None = None
self._no_wait_count_lock = threading.Lock()
[docs]
def submit_no_wait(
self, call: Callable[P, Any], *args: P.args, **keywds: P.kwargs
) -> None:
"""Submit work to the threadpool with no expectation of waiting.
Any errors occurring in the passed callable will be logged. This
call will block and log a warning if the threadpool reaches its
max queued no-wait call count.
"""
# If we're too backlogged, issue a warning and block until we
# aren't. We don't bother with the lock here since this can be
# slightly inexact. In general we should aim to not hit this
# limit but it is good to have backpressure to avoid runaway
# queues in cases of network outages/etc.
if self.no_wait_count > self._max_no_wait_count:
now = time.monotonic()
if (
self._last_no_wait_warn_time is None
or now - self._last_no_wait_warn_time > 10.0
):
logger.warning(
'ThreadPoolExecutorEx hit max no-wait limit of %s;'
' blocking.',
self._max_no_wait_count,
)
self._last_no_wait_warn_time = now
while self.no_wait_count > self._max_no_wait_count:
time.sleep(0.01)
fut = self.submit(call, *args, **keywds)
with self._no_wait_count_lock:
self.no_wait_count += 1
fut.add_done_callback(self._no_wait_done)
def _no_wait_done(self, fut: Future) -> None:
with self._no_wait_count_lock:
self.no_wait_count -= 1
try:
fut.result()
except Exception:
logger.exception('Error in work submitted via submit_no_wait().')
# Docs-generation hack; import some stuff that we likely only forward-declared
# in our actual source code so that docs tools can find it.
from typing import (Coroutine, Any, Literal, Callable,
Generator, Awaitable, Sequence, Self)
import asyncio
from concurrent.futures import Future
from pathlib import Path
from enum import Enum