# Released under the MIT License. See LICENSE for details.
#
"""Workspace related functionality."""
from __future__ import annotations
import os
import sys
import logging
from pathlib import Path
from threading import Thread
from functools import partial
from typing import TYPE_CHECKING
from efro.error import CleanError
import _babase
import bacommon.cloud
from bacommon.transfer import DirectoryManifest
if TYPE_CHECKING:
from typing import Callable
import babase
[docs]
class WorkspaceSubsystem:
"""Subsystem for workspace handling in the app.
Access the single shared instance of this class via the
:attr:`~babase.App.workspaces` attr on the :class:`~babase.App`
class.
"""
def __init__(self) -> None:
pass
def set_active_workspace(
self,
account: babase.AccountV2Handle,
workspaceid: str,
workspacename: str,
on_completed: Callable[[], None],
) -> None:
"""(internal)
:meta private:
"""
# Do our work in a background thread so we don't destroy
# interactivity.
Thread(
target=lambda: self._set_active_workspace_bg(
account=account,
workspaceid=workspaceid,
workspacename=workspacename,
on_completed=on_completed,
),
daemon=True,
).start()
def _errmsg(self, msg: babase.Lstr) -> None:
_babase.screenmessage(msg, color=(1, 0, 0))
_babase.getsimplesound('error').play()
def _successmsg(self, msg: babase.Lstr) -> None:
_babase.screenmessage(msg, color=(0, 1, 0))
_babase.getsimplesound('gunCocking').play()
def _set_active_workspace_bg(
self,
account: babase.AccountV2Handle,
workspaceid: str,
workspacename: str,
on_completed: Callable[[], None],
) -> None:
from babase._language import Lstr
class _SkipSyncError(RuntimeError):
pass
plus = _babase.app.plus
assert plus is not None
set_path = True
wspath = Path(
_babase.get_volatile_data_directory(), 'workspaces', workspaceid
)
try:
# If it seems we're offline, don't even attempt a sync,
# but allow using the previous synced state.
# (is this a good idea?)
if not plus.cloud.is_connected():
raise _SkipSyncError()
manifest = DirectoryManifest.create_from_disk(wspath)
# FIXME: Should implement a way to pass account credentials in
# from the logic thread.
state = bacommon.cloud.WorkspaceFetchState(manifest=manifest)
while True:
with account:
response = plus.cloud.send_message(
bacommon.cloud.WorkspaceFetchMessage(
workspaceid=workspaceid, state=state
)
)
state = response.state
self._handle_deletes(
workspace_dir=wspath, deletes=response.deletes
)
self._handle_downloads_inline(
workspace_dir=wspath,
downloads_inline=response.downloads_inline,
)
if response.done:
# Server only deals in files; let's clean up any
# leftover empty dirs after the dust has cleared.
self._handle_dir_prune_empty(str(wspath))
break
state.iteration += 1
_babase.pushcall(
partial(
self._successmsg,
Lstr(
resource='activatedText',
subs=[('${THING}', workspacename)],
),
),
from_other_thread=True,
)
except _SkipSyncError:
_babase.pushcall(
partial(
self._errmsg,
Lstr(
resource='workspaceSyncReuseText',
subs=[('${WORKSPACE}', workspacename)],
),
),
from_other_thread=True,
)
except CleanError as exc:
# Avoid reusing existing if we fail in the middle; could
# be in wonky state.
set_path = False
_babase.pushcall(
partial(self._errmsg, Lstr(value=str(exc))),
from_other_thread=True,
)
except Exception:
# Ditto.
set_path = False
logging.exception("Error syncing workspace '%s'.", workspacename)
_babase.pushcall(
partial(
self._errmsg,
Lstr(
resource='workspaceSyncErrorText',
subs=[('${WORKSPACE}', workspacename)],
),
),
from_other_thread=True,
)
if set_path and wspath.is_dir():
# Add to Python paths and also to list of stuff to be scanned
# for meta tags.
sys.path.insert(0, str(wspath))
_babase.app.meta.extra_scan_dirs.append(str(wspath))
# Job's done!
_babase.pushcall(on_completed, from_other_thread=True)
def _handle_deletes(self, workspace_dir: Path, deletes: list[str]) -> None:
"""Handle file deletes."""
for fname in deletes:
fname = os.path.join(workspace_dir, fname)
# Server shouldn't be sending us dir paths here.
assert not os.path.isdir(fname)
os.unlink(fname)
def _handle_downloads_inline(
self,
workspace_dir: Path,
downloads_inline: dict[str, bytes],
) -> None:
"""Handle inline file data to be saved to the client."""
for fname, fdata in downloads_inline.items():
fname = os.path.join(workspace_dir, fname)
# If there's a directory where we want our file to go, clear it
# out first. File deletes should have run before this so
# everything under it should be empty and thus killable via rmdir.
if os.path.isdir(fname):
for basename, dirnames, _fn in os.walk(fname, topdown=False):
for dirname in dirnames:
os.rmdir(os.path.join(basename, dirname))
os.rmdir(fname)
dirname = os.path.dirname(fname)
if dirname:
os.makedirs(dirname, exist_ok=True)
with open(fname, 'wb') as outfile:
outfile.write(fdata)
def _handle_dir_prune_empty(self, prunedir: str) -> None:
"""Handle pruning empty directories."""
# Walk the tree bottom-up so we can properly kill recursive empty dirs.
for basename, dirnames, filenames in os.walk(prunedir, topdown=False):
# It seems that child dirs we kill during the walk are still
# listed when the parent dir is visited, so lets make sure
# to only acknowledge still-existing ones.
dirnames = [
d for d in dirnames if os.path.exists(os.path.join(basename, d))
]
if not dirnames and not filenames and basename != prunedir:
os.rmdir(basename)
# Docs-generation hack; import some stuff that we likely only forward-declared
# in our actual source code so that docs tools can find it.
from typing import (Coroutine, Any, Literal, Callable,
Generator, Awaitable, Sequence, Self)
import asyncio
from concurrent.futures import Future
from pathlib import Path
from enum import Enum