mirror of https://github.com/JoshKarpel/spiel
You cannot select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
279 lines
9.3 KiB
Python
279 lines
9.3 KiB
Python
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import code
|
|
import datetime
|
|
import importlib.util
|
|
import os
|
|
import sys
|
|
from asyncio import wait
|
|
from contextlib import contextmanager, redirect_stderr, redirect_stdout
|
|
from functools import cached_property, partial
|
|
from pathlib import Path
|
|
from time import monotonic
|
|
from typing import Callable, ClassVar, ContextManager, Iterator, List, Tuple
|
|
|
|
from rich.style import Style
|
|
from rich.text import Text
|
|
from textual import log
|
|
from textual.app import App
|
|
from textual.binding import Binding
|
|
from textual.events import Resize
|
|
from textual.reactive import reactive
|
|
from watchfiles import awatch
|
|
|
|
from spiel.constants import DECK, RELOAD_MESSAGE_TIME_FORMAT
|
|
from spiel.deck import Deck
|
|
from spiel.exceptions import NoDeckFound
|
|
from spiel.screens.deck import DeckScreen
|
|
from spiel.screens.help import HelpScreen
|
|
from spiel.screens.slide import SlideScreen
|
|
from spiel.screens.transition import SlideTransitionScreen
|
|
from spiel.transitions.protocol import Direction
|
|
from spiel.triggers import Triggers
|
|
from spiel.utils import clamp
|
|
from spiel.widgets.slide import SlideWidget
|
|
|
|
|
|
def load_deck(path: Path) -> Deck:
|
|
module_name = "__deck"
|
|
spec = importlib.util.spec_from_file_location(module_name, path)
|
|
|
|
if spec is None:
|
|
raise NoDeckFound(f"{path.resolve()} does not appear to be an importable Python module.")
|
|
|
|
module = importlib.util.module_from_spec(spec)
|
|
sys.modules[module_name] = module
|
|
|
|
loader = spec.loader
|
|
assert loader is not None
|
|
loader.exec_module(module)
|
|
|
|
try:
|
|
deck = getattr(module, DECK)
|
|
except AttributeError:
|
|
raise NoDeckFound(f"The module at {path} does not have an attribute named {DECK}.")
|
|
|
|
if not isinstance(deck, Deck):
|
|
raise NoDeckFound(
|
|
f"The module at {path} has an attribute named {DECK}, but it is a {type(deck).__name__}, not a {Deck.__name__}."
|
|
)
|
|
|
|
return deck
|
|
|
|
|
|
SuspendType = Callable[[], ContextManager[None]]
|
|
|
|
|
|
class SpielApp(App[None]):
|
|
CSS_PATH = "spiel.css"
|
|
BINDINGS: ClassVar[List[Binding | Tuple[str, str, str]]] = [
|
|
Binding("d", "switch_screen('deck')", "Go to the Deck view."),
|
|
Binding("question_mark", "push_screen('help')", "Go to the Help view."),
|
|
Binding("i", "repl", "Switch to the REPL."),
|
|
Binding("p", "screenshot", "Take a screenshot."),
|
|
]
|
|
|
|
deck = reactive(Deck(name="New Deck"))
|
|
current_slide_idx = reactive(0)
|
|
message = reactive(Text(""))
|
|
|
|
def __init__(
|
|
self,
|
|
deck_path: Path,
|
|
watch_path: Path | None = None,
|
|
_show_messages: bool = True,
|
|
_fixed_time: datetime.datetime | None = None,
|
|
_fixed_triggers: Triggers | None = None,
|
|
_enable_transitions: bool = True,
|
|
_slide_refresh_rate: float = 1 / 60,
|
|
) -> None:
|
|
super().__init__()
|
|
|
|
self.deck_path = deck_path
|
|
self.watch_path = watch_path
|
|
|
|
self.show_messages = _show_messages
|
|
self.fixed_time = _fixed_time
|
|
self.fixed_triggers = _fixed_triggers
|
|
self.enable_transitions = _enable_transitions
|
|
self.slide_refresh_rate = _slide_refresh_rate
|
|
|
|
async def on_mount(self) -> None:
|
|
self.deck = load_deck(self.deck_path)
|
|
self.reloader = asyncio.create_task(self.reload())
|
|
|
|
self.install_screen(SlideScreen(), name="slide")
|
|
self.install_screen(DeckScreen(), name="deck")
|
|
self.install_screen(HelpScreen(), name="help")
|
|
await self.push_screen("slide")
|
|
|
|
async def reload(self) -> None:
|
|
if self.watch_path is None:
|
|
return
|
|
|
|
log(f"Watching {self.watch_path} for changes")
|
|
async for changes in awatch(self.watch_path):
|
|
change_msg = "\n ".join([""] + [f"{k.raw_str()}: {v}" for k, v in changes])
|
|
log(f"Reloading deck from {self.deck_path} due to detected file changes:{change_msg}")
|
|
try:
|
|
self.deck = load_deck(self.deck_path)
|
|
self.current_slide_idx = clamp(self.current_slide_idx, 0, len(self.deck))
|
|
self.set_message_temporarily(
|
|
Text(
|
|
f"Reloaded deck at {datetime.datetime.now().strftime(RELOAD_MESSAGE_TIME_FORMAT)}",
|
|
style=Style(dim=True),
|
|
),
|
|
delay=10,
|
|
)
|
|
except Exception as e:
|
|
self.set_message_temporarily(
|
|
Text(
|
|
f"Failed to reload deck at {datetime.datetime.now().strftime(RELOAD_MESSAGE_TIME_FORMAT)} due to: {e}",
|
|
style=Style(color="red"),
|
|
),
|
|
delay=10,
|
|
)
|
|
|
|
def on_resize(self, event: Resize) -> None:
|
|
self.set_message_temporarily(
|
|
message=Text(f"Screen resized to {event.size}", style=Style(dim=True)), delay=2
|
|
)
|
|
|
|
def set_message_temporarily(self, message: Text, delay: float) -> None:
|
|
if not self.show_messages:
|
|
return
|
|
|
|
self.message = message
|
|
|
|
def clear() -> None:
|
|
if self.message is message:
|
|
self.message = Text("")
|
|
|
|
self.set_timer(delay, clear)
|
|
|
|
async def action_next_slide(self) -> None:
|
|
await self.handle_new_slide(self.current_slide_idx + 1, Direction.Next)
|
|
|
|
async def action_prev_slide(self) -> None:
|
|
await self.handle_new_slide(self.current_slide_idx - 1, Direction.Previous)
|
|
|
|
async def handle_new_slide(self, new_slide_idx: int, direction: Direction) -> None:
|
|
new_slide_idx = clamp(new_slide_idx, 0, len(self.deck) - 1)
|
|
|
|
current_slide = self.deck[self.current_slide_idx]
|
|
new_slide = self.deck[new_slide_idx]
|
|
|
|
transition = new_slide.transition or self.deck.default_transition
|
|
|
|
if (
|
|
self.current_slide_idx == new_slide_idx
|
|
or not isinstance(self.screen, SlideScreen)
|
|
or transition is None
|
|
or not self.enable_transitions
|
|
):
|
|
self.current_slide_idx = new_slide_idx
|
|
return
|
|
|
|
transition_screen = SlideTransitionScreen(
|
|
from_slide=current_slide,
|
|
from_triggers=self.query_one(SlideWidget).triggers,
|
|
to_slide=new_slide,
|
|
direction=direction,
|
|
transition=transition,
|
|
)
|
|
await self.switch_screen(transition_screen)
|
|
transition_screen.animate(
|
|
"progress",
|
|
value=100,
|
|
delay=0,
|
|
duration=0.75,
|
|
on_complete=lambda: self.finalize_transition(new_slide_idx),
|
|
)
|
|
|
|
async def finalize_transition(self, new_slide_idx: int) -> None:
|
|
await self.switch_screen("slide")
|
|
|
|
self.current_slide_idx = new_slide_idx
|
|
|
|
def action_next_row(self) -> None:
|
|
self.current_slide_idx = clamp(
|
|
self.current_slide_idx + self.deck_grid_width, 0, len(self.deck) - 1
|
|
)
|
|
|
|
def action_prev_row(self) -> None:
|
|
self.current_slide_idx = clamp(
|
|
self.current_slide_idx - self.deck_grid_width, 0, len(self.deck) - 1
|
|
)
|
|
|
|
def watch_deck(self, new_deck: Deck) -> None:
|
|
self.title = new_deck.name
|
|
|
|
def watch_current_slide_idx(self, new_current_slide_idx: int) -> None:
|
|
self.query_one(SlideWidget).triggers = self.fixed_triggers or Triggers.new()
|
|
self.sub_title = self.deck[new_current_slide_idx].title
|
|
|
|
def action_trigger(self) -> None:
|
|
now = monotonic()
|
|
slide_widget = self.query_one(SlideWidget)
|
|
slide_widget.triggers = Triggers(now=now, _times=(*slide_widget.triggers._times, now))
|
|
|
|
def action_reset_trigger(self) -> None:
|
|
slide_widget = self.query_one(SlideWidget)
|
|
slide_widget.triggers = Triggers.new()
|
|
|
|
@cached_property
|
|
def repl(self) -> Callable[[], None]:
|
|
# Lazily enable readline support
|
|
try:
|
|
import readline # noqa: F401
|
|
except ImportError:
|
|
pass
|
|
|
|
self.console.clear() # clear the console the first time we go into the repl
|
|
sys.stdout.flush()
|
|
|
|
repl = code.InteractiveConsole()
|
|
return partial(repl.interact, banner="", exitmsg="")
|
|
|
|
def action_repl(self) -> None:
|
|
with self.suspend():
|
|
self.repl()
|
|
|
|
async def action_quit(self) -> None:
|
|
self.reloader.cancel()
|
|
await wait([self.reloader], timeout=1)
|
|
|
|
await super().action_quit()
|
|
|
|
@contextmanager
|
|
def suspend(self) -> Iterator[None]:
|
|
driver = self._driver
|
|
|
|
if driver is not None:
|
|
driver.stop_application_mode()
|
|
with redirect_stdout(sys.__stdout__), redirect_stderr(sys.__stderr__):
|
|
yield
|
|
driver.start_application_mode()
|
|
|
|
@property
|
|
def deck_grid_width(self) -> int:
|
|
return max(self.size.width // 35, 1)
|
|
|
|
|
|
def present(deck_path: Path | str, watch_path: Path | str | None = None) -> None:
|
|
"""
|
|
Present the deck defined in the given `deck_path`.
|
|
|
|
Args:
|
|
deck_path: The file to look for a deck in.
|
|
watch_path: When filesystem changes are detected below this path (recursively), reload the deck from the `deck_path`.
|
|
If `None` (the default), use the parent directory of the `deck_path`.
|
|
"""
|
|
os.environ["TEXTUAL"] = ",".join(sorted({"debug", "devtools"}))
|
|
|
|
deck_path = Path(deck_path).resolve()
|
|
watch_path = Path(watch_path or deck_path.parent).resolve()
|
|
|
|
SpielApp(deck_path=deck_path, watch_path=watch_path).run()
|