# Released under the MIT License. See LICENSE for details.#"""Standard snippets that can be pulled into project pcommand scripts.A snippet is a mini-program that directly takes input from stdin and doessome focused task. This module is a repository of common snippets that canbe imported into projects' pcommand script for easy reuse."""from__future__importannotations# Note: import as little as possible here at the module level to keep# launch times fast for small snippets.importosimportsysfrompathlibimportPathfromtypingimportTYPE_CHECKINGifTYPE_CHECKING:importioimportthreadingfromtypingimportAnyfromefro.terminalimportClrBase# Absolute path of the project root.PROJROOT=Path(__file__).resolve().parents[2]# Set of arguments for the currently running command.# Note that, unlike sys.argv, this will not include the script path or# the name of the pcommand; only the arguments *to* the command._g_thread_local_storage:threading.local|None=None# Discovered functions for the current project._g_funcs:dict|None=None# Are we running as a server?_g_batch_server_mode:bool=False
[docs]defpcommand_main(globs:dict[str,Any])->None:"""Main entry point to pcommand scripts. We simply look for all public functions in the provided module globals and call the one corresponding to the first passed arg. """importtypesfromefro.terminalimportClrfromefro.errorimportCleanErrorglobal_g_funcs# pylint: disable=global-statementassert_g_funcsisNone# Nowadays generated pcommand scripts run themselves using the# project virtual environment's Python interpreter# (.venv/bin/pythonX.Y, etc.). This nicely sets up the Python# environment but does not touch PATH, meaning the stuff under# .venv/bin won't get found if we do subprocess.run()/etc.## One way to solve this would be to always do `source# .venv/bin/activate` before running tools/pcommand. This sets PATH# but also seems unwieldy and easy to forget. It's nice to be able# to just run tools/pcommand and assume it'll do the right thing.## So let's go ahead and set up PATH here so tools/pcommand by itself# *does* do the right thing.# Don't do this on Windows; we're not currently using virtual-envs# there for the little bit of tools stuff we support.ifnotsys.platform.startswith('win'):abs_exe_path=Path(sys.executable).absolute()pathparts=abs_exe_path.partsif(len(pathparts)<3orpathparts[-3]!='.venv'orpathparts[-2]!='bin'ornotpathparts[-1].startswith('python')):raiseRuntimeError('Unexpected Python environment;'f' we expect to be running under something like'f" .venv/bin/pythonX.Y; found '{abs_exe_path}'.")cur_paths_str=os.environ.get('PATH')ifcur_paths_strisNone:raiseRuntimeError("'PATH' is not currently set; unexpected.")venv_bin_dir=str(abs_exe_path.parent)# Only add our entry if it's not already there; don't want PATH to# get out of control if we're doing recursive stuff.cur_paths=cur_paths_str.split(':')ifvenv_bin_dirnotincur_paths:os.environ['PATH']=':'.join([venv_bin_dir]+cur_paths)# Build our list of available command functions._g_funcs=dict(((name,obj)forname,objinglobs.items()ifnotname.startswith('_')andname!='pcommand_main'andisinstance(obj,types.FunctionType)))try:_run_pcommand(sys.argv)exceptKeyboardInterruptasexc:print(f'{Clr.RED}{exc}{Clr.RST}')sys.exit(1)exceptCleanErrorasexc:exc.pretty_print()sys.exit(1)
[docs]defget_args()->list[str]:"""Return the args for the current pcommand."""# pylint: disable=unsubscriptable-object, not-an-iterableifnot_g_batch_server_mode:returnsys.argv[2:]# Ok, we're in batch mode. We should have stuffed some args into# thread-local storage.assert_g_thread_local_storageisnotNoneargv:list[str]|None=getattr(_g_thread_local_storage,'argv',None)ifargvisNone:raiseRuntimeError('Thread local args not found where expected.')assertisinstance(argv,list)assertall(isinstance(i,str)foriinargv)returnargv[2:]
[docs]defclr()->type[ClrBase]:"""Like efro.terminal.Clr but for use with pcommand.clientprint(). This properly colorizes or doesn't colorize based on whether the *client* where output will be displayed is running on a terminal. Regular print() output should still use efro.terminal.Clr for this purpose. """importefro.terminalif_g_batch_server_mode:assert_g_thread_local_storageisnotNoneclrtp:type[ClrBase]=_g_thread_local_storage.clrassertissubclass(clrtp,efro.terminal.ClrBase)returnclrtpreturnefro.terminal.Clr
[docs]defclientprint(*args:Any,stderr:bool=False,end:str|None=None)->None:"""Print to client stdout. Note that, in batch mode, the results of all clientprints will show up only after the command completes. In regular mode, clientprint() simply passes through to regular print(). """if_g_batch_server_mode:assert_g_thread_local_storageisnotNoneprint(*args,file=(_g_thread_local_storage.stderrifstderrelse_g_thread_local_storage.stdout),end=end,)else:print(*args,end=end)
def_run_pcommand(sysargv:list[str])->None:"""Run a pcommand given raw sys args."""fromefro.errorimportCleanErrorassert_g_funcsisnotNoneclrtp=clr()error=Falseshow_help=Falseiflen(sysargv)<2:clientprint(f'{clrtp.SRED}Error: Command expected.{clrtp.RST}')show_help=Trueerror=Trueelse:ifsysargv[1]=='help':iflen(sysargv)==2:show_help=Trueelifsysargv[2]notin_g_funcs:raiseCleanError('Invalid help command.')else:docs=_trim_docstring(getattr(_g_funcs[sysargv[2]],'__doc__','<no docs>'))clientprint(f'\n{clrtp.MAG}{clrtp.BLD}'f'pcommand {sysargv[2]}:{clrtp.RST}\n'f'{clrtp.MAG}{docs}{clrtp.RST}\n')elifsysargv[1]in_g_funcs:_g_funcs[sysargv[1]]()else:raiseCleanError(f"Unknown pcommand '{sysargv[1]}'.")ifshow_help:clientprint(f'The {clrtp.MAG}{clrtp.BLD}pcommand{clrtp.RST} script encapsulates'f' a collection of project-related commands.')clientprint(f"Run {clrtp.MAG}{clrtp.BLD}'pcommand [COMMAND] ...'"f'{clrtp.RST} to run a command.')clientprint(f"Run {clrtp.MAG}{clrtp.BLD}'pcommand help [COMMAND]'"f'{clrtp.RST} for full documentation for a command.')clientprint('Available commands:')forfunc,objinsorted(_g_funcs.items()):doc=getattr(obj,'__doc__','').splitlines()[0].strip()clientprint(f'{clrtp.MAG}{func}{clrtp.BLU} - {doc}{clrtp.RST}')iferror:raiseCleanError()
[docs]defenter_batch_server_mode()->None:"""Called by pcommandserver when we start serving."""# (try to avoid importing this in non-batch mode in case it shaves# off a bit of time)importthreading# pylint: disable=global-statementglobal_g_batch_server_mode,_g_thread_local_storageassertnot_g_batch_server_mode_g_batch_server_mode=True# Spin up our thread-local storage.assert_g_thread_local_storageisNone_g_thread_local_storage=threading.local()
[docs]defis_batch()->bool:"""Is the current pcommand running under a batch server? Commands that do things that are unsafe to do in server mode such as chdir should assert that this is not true. """return_g_batch_server_mode
[docs]defrun_client_pcommand(args:list[str],clrtp:type[ClrBase],logpath:str)->tuple[int,str,str]:"""Call a pcommand function as a server. Returns a result code and stdout output. """importioimporttracebackfromefro.errorimportCleanErrorassert_g_batch_server_modeassert_g_thread_local_storageisnotNonewithio.StringIO()asstdout,io.StringIO()asstderr:# Stuff some state into thread-local storage for the handler thread# to access._g_thread_local_storage.stdout=stdout_g_thread_local_storage.stderr=stderr_g_thread_local_storage.argv=args_g_thread_local_storage.clr=clrtptry:_run_pcommand(args)resultcode=0exceptKeyboardInterruptasexc:clientprint(f'{clrtp.RED}{exc}{clrtp.RST}')resultcode=1exceptCleanErrorasexc:exc.pretty_print(file=stderr,clr=clrtp)resultcode=1exceptException:traceback.print_exc(file=stderr)print(f'More error output may be available at {logpath}',file=stderr)resultcode=1stdout_str=stdout.getvalue()stderr_str=stderr.getvalue()returnresultcode,stdout_str,stderr_str
[docs]defdisallow_in_batch()->None:"""Utility call to raise a clean error if running under batch mode."""fromefro.errorimportCleanErrorif_g_batch_server_mode:raiseCleanError('This pcommand does not support batch mode.\n''See docs in efrotools.pcommand if you want to add it.')
def_trim_docstring(docstring:str)->str:"""Trim raw doc-strings for pretty printing. Taken straight from PEP 257. """ifnotdocstring:return''# Convert tabs to spaces (following the normal Python rules) and# split into a list of lines.lines=docstring.expandtabs().splitlines()# Determine minimum indentation (first line doesn't count).indent=sys.maxsizeforlineinlines[1:]:stripped=line.lstrip()ifstripped:indent=min(indent,len(line)-len(stripped))# Remove indentation (first line is special).trimmed=[lines[0].strip()]ifindent<sys.maxsize:forlineinlines[1:]:trimmed.append(line[indent:].rstrip())# Strip off trailing and leading blank lines.whiletrimmedandnottrimmed[-1]:trimmed.pop()whiletrimmedandnottrimmed[0]:trimmed.pop(0)# Return a single string.return'\n'.join(trimmed)