from .RAIILock import RAIILock from collections import deque import math import os import threading import time class TextCanvas: __DEFAULT_WIDTH = 80 __DEFAULT_HEIGHT = 22 __TERMINAL_UPDATE_INTERVAL = 1 __BACKGROUND_CHARACTER = " " __DRAW_RESOLUTION = 3 __DEFAULT_TEXT_HEIGHT_TO_WIDTH_RATIO = 2.0 __MAX_FPS_SAMPLES = 100 def __init__( self, output_width=None, output_height=None, text_to_height_width=None, auto_adjust_to_terminal=True, print_upward=True ): self.__lock = threading.RLock() # Real dimensions if output_width is None: output_width = self.__DEFAULT_WIDTH if output_height is None: output_height = self.__DEFAULT_HEIGHT self.__grid_width = output_width self.__grid_height = output_height # Virtual dimensions if text_to_height_width is None: text_to_height_width = self.__DEFAULT_TEXT_HEIGHT_TO_WIDTH_RATIO self.__text_to_height_width = text_to_height_width self.__virtual_width = None self.__virtual_height = None self._auto_calculate_virtual_dimensions(acquire_locks=False) self._update_virtual_ratios(acquire_locks=False) # Terminal stuff self.__terminal_width = 0 self.__terminal_height = 0 self.__auto_adjust_to_terminal = auto_adjust_to_terminal self.__last_terminal_dimensions_update = time.time() if self.__auto_adjust_to_terminal: self._update_terminal_dimensions(force=True) # self.__print_upward = print_upward # Actual grid self.__grid = [] self.__render_string = "" self.__is_dirty = True self.__last_render_time = time.time() self.__last_get_render_time = time.time() self.__fps_samples = deque(maxlen=self.__MAX_FPS_SAMPLES) self.__conveyor_enabled = False self.__conveyor_thread = None self._start_conveyor() def __exit__(self, exc_type, exc_val, exc_tb): self.shutdown() def shutdown(self): self._stop_conveyor() def _start_conveyor(self, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): self._stop_conveyor(acquire_locks=False) self.__conveyor_enabled = True self.__conveyor_thread = threading.Thread(target=self._conveyor) self.__conveyor_thread.start() def _stop_conveyor(self, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): if self.__conveyor_thread: self.__conveyor_enabled = False self.__conveyor_thread.join() self.__conveyor_thread = None def _conveyor(self): # So far simply does maintenance once per second while self.__conveyor_enabled is True: with RAIILock(self.__lock): self.__fps_samples.append(0) time.sleep(1) def _update_virtual_ratios(self, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): self.__virtual_height_per_grid_height = self.__virtual_height / self.__grid_height self.__virtual_width_per_grid_width = self.__virtual_width / self.__grid_width def _update_render_if_dirty(self, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): if self.__is_dirty: self._update_render(acquire_locks=False) def _update_render(self, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): self._update_render_string(acquire_locks=False) self.__last_render_time = time.time() self.__is_dirty = False def _update_render_string(self, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): s = "" for line in self.__grid: for c in line: s += c s += "\n" # Don't need that last line feed s = s[:-1] self.__render_string = s def get(self, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): self._record_frame_get(acquire_locks=False) self._update_render_if_dirty(acquire_locks=False) return self.__render_string def _record_frame_get(self, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): if len(self.__fps_samples) == 0: return self.__fps_samples[0] += 1 def get_fps(self, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): if len(self.__fps_samples) == 0: return 0 fps = 0 for f in self.__fps_samples: fps += f fps /= len(self.__fps_samples) return fps def print(self, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): self._update_render_if_dirty(acquire_locks=False) s = "" # Move the cursor up to compensate if self.__print_upward: up_count = self.__grid_height s += ("\033[F" * up_count) # Grab render s += self.get(acquire_locks=False) print(s) def draw_text(self, x, y, text, center=False, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): if not isinstance(text, str): text = str(text) if len(text) == 0: return if center is True: x -= len(text) / 2 x = int(x) for c in text: self.write(x=x, y=y, character=c, acquire_locks=False) x += 1 def draw_line(self, start_x, start_y, end_x, end_y, character="*", acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): distance_x = math.fabs(end_x - start_x) distance_y = math.fabs(end_y - start_y) if distance_x > distance_y: distance = distance_x else: distance = distance_y direction_x = 1 if end_x > start_x else -1 direction_y = 1 if end_y > start_y else -1 steps = round(distance * self.__DRAW_RESOLUTION) if steps == 0: return increment_x = distance_x / steps increment_x *= direction_x increment_y = distance_y / steps increment_y *= direction_y pos_x = start_x pos_y = start_y for i in range(steps): pos_x_rounded = round(pos_x) pos_y_rounded = round(pos_y) self.write(x=pos_x_rounded, y=pos_y_rounded, character=character, acquire_locks=False) pos_x += increment_x pos_y += increment_y def write(self, x, y, character, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): x, y = self._translate_virtual_to_internal_coordinates(x=x, y=y) if 0 <= y < len(self.__grid): if 0 <= x < len(self.__grid[y]): self.__grid[y][x] = character self.__is_dirty = True def clear(self, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): self._auto_adjust_to_terminal(acquire_locks=False) self.__grid = [] for y in range(self.__grid_height): self.__grid.append([]) for x in range(self.__grid_width): self.__grid[y].append(self.__BACKGROUND_CHARACTER) self.__is_dirty = True def get_dimensions(self, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): return self.get_virtual_dimensions() def get_virtual_dimensions(self, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): return self.__virtual_width, self.__virtual_height def _get_internal_dimensions(self, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): return self.__grid_width, self.__grid_height def _translate_virtual_to_internal_coordinates(self, x, y): x_translated = x / self.__virtual_width_per_grid_width y_translated = y / self.__virtual_height_per_grid_height x_translated = round(x_translated) y_translated = round(y_translated) return x_translated, y_translated def _is_time_to_update_terminal_dimensions(self): if self.__auto_adjust_to_terminal: now = time.time() if now - self.__last_terminal_dimensions_update > self.__TERMINAL_UPDATE_INTERVAL: return True return False def _auto_adjust_to_terminal(self, force=False, acquire_locks=True): if not force and not self.__auto_adjust_to_terminal: return with RAIILock(self.__lock, defer=not acquire_locks): self._update_terminal_dimensions(acquire_locks=False) self.__grid_width = self.__terminal_width self.__grid_height = self.__terminal_height self._auto_calculate_virtual_dimensions(acquire_locks=False) def _auto_calculate_virtual_dimensions(self, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): self.__virtual_width = self.__grid_width self.__virtual_height = self.__grid_height * self.__text_to_height_width self._update_virtual_ratios(acquire_locks=False) def _update_terminal_dimensions(self, force=False, acquire_locks=True): with RAIILock(self.__lock, defer=not acquire_locks): now = time.time() if force is not False and not self._is_time_to_update_terminal_dimensions(): return self.__last_terminal_dimensions_update = now width, height = self.determine_terminal_dimensions() self.__terminal_width = width self.__terminal_height = height - 1 self._auto_calculate_virtual_dimensions(acquire_locks=False) @staticmethod def determine_terminal_dimensions(): columns, lines = os.get_terminal_size() return columns, lines