From 45d3b096f3b1ad4b9188772a9528ae7032292ae6 Mon Sep 17 00:00:00 2001 From: deadc0de6 Date: Tue, 7 Mar 2023 22:37:49 +0100 Subject: [PATCH] refactor node types --- catcli/catalog.py | 20 ++-- catcli/catcli.py | 24 ++--- catcli/cnode.py | 176 ++++++++++++++++++++++++++++------- catcli/fuser.py | 22 ++--- catcli/nodeprinter.py | 5 +- catcli/noder.py | 212 +++++++++++++++++++----------------------- catcli/walker.py | 14 +-- 7 files changed, 284 insertions(+), 189 deletions(-) diff --git a/catcli/catalog.py b/catcli/catalog.py index 821327c..2cdf8ed 100644 --- a/catcli/catalog.py +++ b/catcli/catalog.py @@ -12,7 +12,7 @@ from anytree.exporter import JsonExporter # type: ignore from anytree.importer import JsonImporter # type: ignore # local imports -from catcli.cnode import Node +from catcli.cnode import NodeMeta, NodeTop from catcli.utils import ask from catcli.logger import Logger @@ -33,10 +33,10 @@ class Catalog: self.path = path self.debug = debug self.force = force - self.metanode: Optional[Node] = None + self.metanode: Optional[NodeMeta] = None self.pickle = usepickle - def set_metanode(self, metanode: Node) -> None: + def set_metanode(self, metanode: NodeMeta) -> None: """remove the metanode until tree is re-written""" self.metanode = metanode if self.metanode: @@ -50,7 +50,7 @@ class Catalog: return True return False - def restore(self) -> Optional[Node]: + def restore(self) -> Optional[NodeTop]: """restore the catalog""" if not self.path: return None @@ -62,7 +62,7 @@ class Catalog: content = file.read() return self._restore_json(content) - def save(self, node: Node) -> bool: + def save(self, node: NodeTop) -> bool: """save the catalog""" if not self.path: Logger.err('Path not defined') @@ -88,14 +88,14 @@ class Catalog: return Logger.debug(text) - def _save_pickle(self, node: Node) -> bool: + def _save_pickle(self, node: NodeTop) -> bool: """pickle the catalog""" with open(self.path, 'wb') as file: pickle.dump(node, file) self._debug(f'Catalog saved to pickle \"{self.path}\"') return True - def _restore_pickle(self) -> Union[Node, Any]: + def _restore_pickle(self) -> Union[NodeTop, Any]: """restore the pickled tree""" with open(self.path, 'rb') as file: root = pickle.load(file) @@ -103,7 +103,7 @@ class Catalog: self._debug(msg) return root - def _save_json(self, node: Node) -> bool: + def _save_json(self, node: NodeTop) -> bool: """export the catalog in json""" exp = JsonExporter(indent=2, sort_keys=True) with open(self.path, 'w', encoding='UTF-8') as file: @@ -111,9 +111,9 @@ class Catalog: self._debug(f'Catalog saved to json \"{self.path}\"') return True - def _restore_json(self, string: str) -> Node: + def _restore_json(self, string: str) -> NodeTop: """restore the tree from json""" imp = JsonImporter() root = imp.import_(string) self._debug(f'Catalog imported from json \"{self.path}\"') - return cast(Node, root) + return cast(NodeTop, root) diff --git a/catcli/catcli.py b/catcli/catcli.py index efcbfb8..c8ffea9 100755 --- a/catcli/catcli.py +++ b/catcli/catcli.py @@ -17,11 +17,11 @@ from docopt import docopt # local imports from catcli import cnode from catcli.version import __version__ as VERSION +from catcli.cnode import NodeTop, NodeAny from catcli.logger import Logger from catcli.colors import Colors from catcli.catalog import Catalog from catcli.walker import Walker -from catcli.cnode import Node from catcli.noder import Noder from catcli.utils import ask, edit from catcli.fuser import Fuser @@ -82,7 +82,7 @@ Options: def cmd_mount(args: Dict[str, Any], - top: Node, + top: NodeTop, noder: Noder) -> None: """mount action""" mountpoint = args[''] @@ -94,7 +94,7 @@ def cmd_mount(args: Dict[str, Any], def cmd_index(args: Dict[str, Any], noder: Noder, catalog: Catalog, - top: Node) -> None: + top: NodeTop) -> None: """index action""" path = args[''] name = args[''] @@ -117,8 +117,8 @@ def cmd_index(args: Dict[str, Any], start = datetime.datetime.now() walker = Walker(noder, usehash=usehash, debug=debug) - attr = noder.attrs_to_string(args['--meta']) - root = noder.new_storage_node(name, path, parent=top, attrs=attr) + attr = args['--meta'] + root = noder.new_storage_node(name, path, top, attr) _, cnt = walker.index(path, root, name) if subsize: noder.rec_size(root, store=True) @@ -132,7 +132,7 @@ def cmd_index(args: Dict[str, Any], def cmd_update(args: Dict[str, Any], noder: Noder, catalog: Catalog, - top: Node) -> None: + top: NodeTop) -> None: """update action""" path = args[''] name = args[''] @@ -162,7 +162,7 @@ def cmd_update(args: Dict[str, Any], def cmd_ls(args: Dict[str, Any], noder: Noder, - top: Node) -> List[Node]: + top: NodeTop) -> List[NodeAny]: """ls action""" path = args[''] if not path: @@ -197,7 +197,7 @@ def cmd_ls(args: Dict[str, Any], def cmd_rm(args: Dict[str, Any], noder: Noder, catalog: Catalog, - top: Node) -> Node: + top: NodeTop) -> NodeTop: """rm action""" name = args[''] node = noder.get_storage_node(top, name) @@ -212,7 +212,7 @@ def cmd_rm(args: Dict[str, Any], def cmd_find(args: Dict[str, Any], noder: Noder, - top: Node) -> List[Node]: + top: NodeTop) -> List[NodeAny]: """find action""" fromtree = args['--parent'] directory = args['--directory'] @@ -232,7 +232,7 @@ def cmd_find(args: Dict[str, Any], def cmd_graph(args: Dict[str, Any], noder: Noder, - top: Node) -> None: + top: NodeTop) -> None: """graph action""" path = args[''] if not path: @@ -243,7 +243,7 @@ def cmd_graph(args: Dict[str, Any], def cmd_rename(args: Dict[str, Any], catalog: Catalog, - top: Node) -> None: + top: NodeTop) -> None: """rename action""" storage = args[''] new = args[''] @@ -261,7 +261,7 @@ def cmd_rename(args: Dict[str, Any], def cmd_edit(args: Dict[str, Any], noder: Noder, catalog: Catalog, - top: Node) -> None: + top: NodeTop) -> None: """edit action""" storage = args[''] storages = list(x.name for x in top.children) diff --git a/catcli/cnode.py b/catcli/cnode.py index 599d4fa..3742b87 100644 --- a/catcli/cnode.py +++ b/catcli/cnode.py @@ -6,57 +6,39 @@ Class that represents a node in the catalog tree """ # pylint: disable=W0622 +from typing import Dict, Any from anytree import NodeMixin # type: ignore -TYPE_TOP = 'top' -TYPE_FILE = 'file' -TYPE_DIR = 'dir' -TYPE_ARC = 'arc' -TYPE_STORAGE = 'storage' -TYPE_META = 'meta' +_TYPE_BAD = 'badtype' +_TYPE_TOP = 'top' +_TYPE_FILE = 'file' +_TYPE_DIR = 'dir' +_TYPE_ARC = 'arc' +_TYPE_STORAGE = 'storage' +_TYPE_META = 'meta' NAME_TOP = 'top' NAME_META = 'meta' -class Node(NodeMixin): # type: ignore - """a node in the catalog""" +class NodeAny(NodeMixin): # type: ignore + """generic node""" def __init__(self, # type: ignore[no-untyped-def] - name: str, - type: str, - size: float = 0, - relpath: str = '', - md5: str = '', - maccess: float = 0, - free: int = 0, - total: int = 0, - indexed_dt: int = 0, - attr: str = '', - archive: str = '', parent=None, children=None): - """build a node""" + """build generic node""" super().__init__() - self.name = name - self.type = type - self.size = size - self.relpath = relpath - self.md5 = md5 - self.maccess = maccess - self.free = free - self.total = total - self.indexed_dt = indexed_dt - self.attr = attr - self.archive = archive + self._flagged = False self.parent = parent if children: self.children = children - self._flagged = False def flagged(self) -> bool: """is flagged""" + if not hasattr(self, '_flagged'): + return False return self._flagged def flag(self) -> None: @@ -66,3 +48,133 @@ class Node(NodeMixin): # type: ignore def unflag(self) -> None: """unflag node""" self._flagged = False + del self._flagged + + +class NodeTop(NodeAny): + """a top node""" + + def __init__(self, # type: ignore[no-untyped-def] + name: str, + children=None): + """build a top node""" + super().__init__() # type: ignore[no-untyped-call] + self.name = name + self.type = _TYPE_TOP + self.parent = None + if children: + self.children = children + + +class NodeFile(NodeAny): + """a file node""" + + def __init__(self, # type: ignore[no-untyped-def] + name: str, + relpath: str, + size: int, + md5: str, + maccess: float, + parent=None, + children=None): + """build a file node""" + super().__init__() # type: ignore[no-untyped-call] + self.name = name + self.type = _TYPE_FILE + self.relpath = relpath + self.size = size + self.md5 = md5 + self.maccess = maccess + self.parent = parent + if children: + self.children = children + + +class NodeDir(NodeAny): + """a directory node""" + + def __init__(self, # type: ignore[no-untyped-def] + name: str, + relpath: str, + size: int, + maccess: float, + parent=None, + children=None): + """build a directory node""" + super().__init__() # type: ignore[no-untyped-call] + self.name = name + self.type = _TYPE_DIR + self.relpath = relpath + self.size = size + self.maccess = maccess + self.parent = parent + if children: + self.children = children + + +class NodeArchived(NodeAny): + """an archived node""" + + def __init__(self, # type: ignore[no-untyped-def] + name: str, + relpath: str, + size: int, + md5: str, + archive: str, + parent=None, + children=None): + """build an archived node""" + super().__init__() # type: ignore[no-untyped-call] + self.name = name + self.type = _TYPE_ARC + self.relpath = relpath + self.size = size + self.md5 = md5 + self.archive = archive + self.parent = parent + if children: + self.children = children + + +class NodeStorage(NodeAny): + """a storage node""" + + def __init__(self, # type: ignore[no-untyped-def] + name: str, + free: int, + total: int, + size: int, + indexed_dt: float, + attr: Dict[str, Any], + parent=None, + children=None): + """build a storage node""" + super().__init__() # type: ignore[no-untyped-call] + self.name = name + self.type = _TYPE_STORAGE + self.free = free + self.total = total + self.attr = attr + self.size = size + self.indexed_dt = indexed_dt + self.parent = parent + if children: + self.children = children + + +class NodeMeta(NodeAny): + """a meta node""" + + def __init__(self, # type: ignore[no-untyped-def] + name: str, + attr: str, + parent=None, + children=None): + """build a meta node""" + super().__init__() # type: ignore[no-untyped-call] + self.name = name + self.type = _TYPE_META + self.attr = attr + self.parent = parent + if children: + self.children = children diff --git a/catcli/fuser.py b/catcli/fuser.py index 6a4b6cd..ef89a47 100644 --- a/catcli/fuser.py +++ b/catcli/fuser.py @@ -14,8 +14,8 @@ import fuse # type: ignore # local imports from catcli.noder import Noder +from catcli.cnode import NodeTop, NodeAny from catcli import cnode -from catcli.cnode import Node # build custom logger to log in /tmp @@ -34,7 +34,7 @@ class Fuser: """fuse filesystem mounter""" def __init__(self, mountpoint: str, - top: Node, + top: NodeTop, noder: Noder, debug: bool = False): """fuse filesystem""" @@ -50,13 +50,13 @@ class Fuser: class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore """in-memory filesystem for catcli catalog""" - def __init__(self, top: Node, + def __init__(self, top: NodeTop, noder: Noder): """init fuse filesystem""" self.top = top self.noder = noder - def _get_entry(self, path: str) -> Optional[Node]: + def _get_entry(self, path: str) -> Optional[NodeAny]: """return the node pointed by path""" pre = f'{SEPARATOR}{cnode.NAME_TOP}' if not path.startswith(pre): @@ -69,7 +69,7 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore return found[0] return None - def _get_entries(self, path: str) -> List[Node]: + def _get_entries(self, path: str) -> List[NodeAny]: """return nodes pointed by path""" pre = f'{SEPARATOR}{cnode.NAME_TOP}' if not path.startswith(pre): @@ -91,17 +91,17 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore curt = time() mode: Any = S_IFREG - if entry.type == cnode.TYPE_ARC: + if isinstance(entry, cnode.NodeArchived): mode = S_IFREG - elif entry.type == cnode.TYPE_DIR: + elif isinstance(entry, cnode.NodeDir): mode = S_IFDIR - elif entry.type == cnode.TYPE_FILE: + elif isinstance(entry, cnode.NodeFile): mode = S_IFREG - elif entry.type == cnode.TYPE_STORAGE: + elif isinstance(entry, cnode.NodeStorage): mode = S_IFDIR - elif entry.type == cnode.TYPE_META: + elif isinstance(entry, cnode.NodeMeta): mode = S_IFREG - elif entry.type == cnode.TYPE_TOP: + elif isinstance(entry, cnode.NodeTop): mode = S_IFREG return { 'st_mode': (mode), diff --git a/catcli/nodeprinter.py b/catcli/nodeprinter.py index 26505e6..79d6e39 100644 --- a/catcli/nodeprinter.py +++ b/catcli/nodeprinter.py @@ -6,7 +6,8 @@ Class for printing nodes """ import sys -from typing import TypeVar, Type, Optional, Tuple, List +from typing import TypeVar, Type, Optional, Tuple, List, \ + Dict, Any from catcli.colors import Colors from catcli.utils import fix_badchars @@ -25,7 +26,7 @@ class NodePrinter: @classmethod def print_storage_native(cls: Type[CLASSTYPE], pre: str, name: str, args: str, - attr: str) -> None: + attr: Dict[str, Any]) -> None: """print a storage node""" end = '' if attr: diff --git a/catcli/noder.py b/catcli/noder.py index b420c35..dc4d855 100644 --- a/catcli/noder.py +++ b/catcli/noder.py @@ -14,7 +14,8 @@ from pyfzf.pyfzf import FzfPrompt # type: ignore # local imports from catcli import cnode -from catcli.cnode import Node +from catcli.cnode import NodeAny, NodeStorage, \ + NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars from catcli.logger import Logger from catcli.nodeprinter import NodePrinter @@ -53,20 +54,20 @@ class Noder: self.decomp = Decomp() @staticmethod - def get_storage_names(top: Node) -> List[str]: + def get_storage_names(top: NodeTop) -> List[str]: """return a list of all storage names""" return [x.name for x in list(top.children)] - def get_storage_node(self, top: Node, + def get_storage_node(self, top: NodeTop, name: str, - newpath: str = '') -> Node: + newpath: str = '') -> NodeStorage: """ return the storage node if any if newpath is submitted, it will update the media info """ found = None for node in top.children: - if node.type != cnode.TYPE_STORAGE: + if not isinstance(node, cnode.NodeStorage): continue if node.name == name: found = node @@ -75,27 +76,27 @@ class Noder: found.free = shutil.disk_usage(newpath).free found.total = shutil.disk_usage(newpath).total found.ts = int(time.time()) - return cast(Node, found) + return cast(NodeStorage, found) @staticmethod - def get_node(top: Node, + def get_node(top: NodeTop, path: str, - quiet: bool = False) -> Optional[Node]: + quiet: bool = False) -> Optional[NodeAny]: """get the node by internal tree path""" resolv = anytree.resolver.Resolver('name') try: bpath = os.path.basename(path) the_node = resolv.get(top, bpath) - return cast(Node, the_node) + return cast(NodeAny, the_node) except anytree.resolver.ChildResolverError: if not quiet: Logger.err(f'No node at path \"{bpath}\"') return None def get_node_if_changed(self, - top: Node, + top: NodeTop, path: str, - treepath: str) -> Tuple[Optional[Node], bool]: + treepath: str) -> Tuple[Optional[NodeAny], bool]: """ return the node (if any) and if it has changed @top: top node (storage) @@ -131,25 +132,25 @@ class Noder: self._debug(f'\tchange: no change for \"{path}\"') return node, False - def rec_size(self, node: Node, - store: bool = True) -> float: + def rec_size(self, node: Union[NodeDir, NodeStorage], + store: bool = True) -> int: """ recursively traverse tree and return size @store: store the size in the node """ - if node.type == cnode.TYPE_FILE: + if isinstance(node, cnode.NodeFile): self._debug(f'getting node size for \"{node.name}\"') - return float(node.size) + return node.size msg = f'getting node size recursively for \"{node.name}\"' self._debug(msg) - size: float = 0 + size: int = 0 for i in node.children: - if node.type == cnode.TYPE_DIR: + if isinstance(node, cnode.NodeDir): size = self.rec_size(i, store=store) if store: i.size = size size += size - if node.type == cnode.TYPE_STORAGE: + if isinstance(node, cnode.NodeStorage): size = self.rec_size(i, store=store) if store: i.size = size @@ -185,13 +186,12 @@ class Noder: ############################################################### # node creation ############################################################### - def new_top_node(self) -> Node: + def new_top_node(self) -> NodeTop: """create a new top node""" - return Node(cnode.NAME_TOP, - cnode.TYPE_TOP) + return NodeTop(cnode.NAME_TOP) def new_file_node(self, name: str, path: str, - parent: Node, storagepath: str) -> Optional[Node]: + parent: NodeAny, storagepath: str) -> Optional[NodeFile]: """create a new node representing a file""" if not os.path.exists(path): Logger.err(f'File \"{path}\" does not exist') @@ -208,11 +208,12 @@ class Noder: relpath = os.sep.join([storagepath, name]) maccess = os.path.getmtime(path) - node = self._new_generic_node(name, cnode.TYPE_FILE, - relpath, parent, - size=stat.st_size, - md5=md5, - maccess=maccess) + node = NodeFile(name, + relpath, + stat.st_size, + md5, + maccess, + parent=parent) if self.arc: ext = os.path.splitext(path)[1][1:] if ext.lower() in self.decomp.get_formats(): @@ -224,59 +225,46 @@ class Noder: return node def new_dir_node(self, name: str, path: str, - parent: Node, storagepath: str) -> Node: + parent: NodeAny, storagepath: str) -> NodeDir: """create a new node representing a directory""" path = os.path.abspath(path) relpath = os.sep.join([storagepath, name]) maccess = os.path.getmtime(path) - return self._new_generic_node(name, cnode.TYPE_DIR, relpath, - parent, maccess=maccess) + return NodeDir(name, + relpath, + 0, + maccess, + parent=parent) def new_storage_node(self, name: str, path: str, parent: str, - attrs: str = '') -> Node: + attrs: Dict[str, Any]) \ + -> NodeStorage: """create a new node representing a storage""" path = os.path.abspath(path) free = shutil.disk_usage(path).free total = shutil.disk_usage(path).total epoch = int(time.time()) - return Node(name=name, - type=cnode.TYPE_STORAGE, - free=free, - total=total, - parent=parent, - attr=attrs, - indexed_dt=epoch) + return NodeStorage(name, + free, + total, + 0, + epoch, + attrs, + parent=parent) def new_archive_node(self, name: str, path: str, - parent: str, archive: str) -> Node: + parent: str, archive: str) -> NodeArchived: """create a new node for archive data""" - return Node(name=name, type=cnode.TYPE_ARC, relpath=path, - parent=parent, size=0, md5='', - archive=archive) - - @staticmethod - def _new_generic_node(name: str, - nodetype: str, - relpath: str, - parent: Node, - size: float = 0, - md5: str = '', - maccess: float = 0) -> Node: - """generic node creation""" - return Node(name, - nodetype, - size=size, - relpath=relpath, - md5=md5, - maccess=maccess, - parent=parent) + return NodeArchived(name=name, relpath=path, + parent=parent, size=0, md5='', + archive=archive) ############################################################### # node management ############################################################### - def update_metanode(self, top: Node) -> Node: + def update_metanode(self, top: NodeTop) -> NodeMeta: """create or update meta node information""" meta = self._get_meta_node(top) epoch = int(time.time()) @@ -284,9 +272,8 @@ class Noder: attrs: Dict[str, Any] = {} attrs['created'] = epoch attrs['created_version'] = VERSION - meta = Node(name=cnode.NAME_META, - type=cnode.TYPE_META, - attr=self.attrs_to_string(attrs)) + meta = NodeMeta(name=cnode.NAME_META, + attr=self.attrs_to_string(attrs)) if meta.attr: meta.attr += ', ' meta.attr += f'access={epoch}' @@ -294,26 +281,26 @@ class Noder: meta.attr += f'access_version={VERSION}' return meta - def _get_meta_node(self, top: Node) -> Optional[Node]: + def _get_meta_node(self, top: NodeTop) -> Optional[NodeMeta]: """return the meta node if any""" try: - found = next(filter(lambda x: x.type == cnode.TYPE_META, + found = next(filter(lambda x: isinstance(x, cnode.NodeMeta), top.children)) - return cast(Node, found) + return cast(NodeMeta, found) except StopIteration: return None - def clean_not_flagged(self, top: Node) -> int: + def clean_not_flagged(self, top: NodeTop) -> int: """remove any node not flagged and clean flags""" cnt = 0 for node in anytree.PreOrderIter(top): - if node.type not in [cnode.TYPE_FILE, cnode.TYPE_DIR]: + if not isinstance(node, (cnode.NodeDir, cnode.NodeFile)): continue if self._clean(node): cnt += 1 return cnt - def _clean(self, node: Node) -> bool: + def _clean(self, node: NodeAny) -> bool: """remove node if not flagged""" if not node.flagged(): node.parent = None @@ -324,7 +311,7 @@ class Noder: ############################################################### # printing ############################################################### - def _node_to_csv(self, node: Node, + def _node_to_csv(self, node: NodeAny, sep: str = ',', raw: bool = False) -> None: """ @@ -333,7 +320,7 @@ class Noder: @sep: CSV separator character @raw: print raw size rather than human readable """ - if not cnode: + if not node: return if node.type == node.TYPE_TOP: return @@ -374,7 +361,7 @@ class Noder: out.append(node.md5) # md5 else: out.append('') # fake md5 - if node.type == cnode.TYPE_DIR: + if isinstance(node, cnode.NodeDir): out.append(str(len(node.children))) # nbfiles else: out.append('') # fake nbfiles @@ -386,7 +373,7 @@ class Noder: if len(line) > 0: Logger.stdout_nocolor(line) - def _print_node_native(self, node: Node, + def _print_node_native(self, node: NodeAny, pre: str = '', withpath: bool = False, withdepth: bool = False, @@ -403,10 +390,10 @@ class Noder: @recalcparent: get relpath from tree instead of relpath field @raw: print raw size rather than human readable """ - if node.type == cnode.TYPE_TOP: + if isinstance(node, cnode.NodeTop): # top node Logger.stdout_nocolor(f'{pre}{node.name}') - elif node.type == cnode.TYPE_FILE: + elif isinstance(node, cnode.NodeFile): # node of type file name = node.name if withpath: @@ -426,7 +413,7 @@ class Noder: content = Logger.get_bold_text(storage.name) compl += f', storage:{content}' NodePrinter.print_file_native(pre, name, compl) - elif node.type == cnode.TYPE_DIR: + elif isinstance(node, cnode.NodeDir): # node of type directory name = node.name if withpath: @@ -446,7 +433,7 @@ class Noder: if withstorage: attr.append(('storage', Logger.get_bold_text(storage.name))) NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr) - elif node.type == cnode.TYPE_STORAGE: + elif isinstance(node, cnode.NodeStorage): # node of type storage sztotal = size_to_str(node.total, raw=raw) szused = size_to_str(node.total - node.free, raw=raw) @@ -476,14 +463,14 @@ class Noder: name, argsstring, node.attr) - elif node.type == cnode.TYPE_ARC: + elif isinstance(node, cnode.NodeArchived): # archive node if self.arc: NodePrinter.print_archive_native(pre, node.name, node.archive) else: Logger.err(f'bad node encountered: {node}') - def print_tree(self, node: Node, + def print_tree(self, node: NodeAny, fmt: str = 'native', raw: bool = False) -> None: """ @@ -507,7 +494,7 @@ class Noder: Logger.stdout_nocolor(self.CSV_HEADER) self._to_csv(node, raw=raw) - def _to_csv(self, node: Node, + def _to_csv(self, node: NodeAny, raw: bool = False) -> None: """print the tree to csv""" rend = anytree.RenderTree(node, childiter=self._sort_tree) @@ -521,7 +508,7 @@ class Noder: selected = fzf.prompt(strings) return selected - def _to_fzf(self, node: Node, fmt: str) -> None: + def _to_fzf(self, node: NodeAny, fmt: str) -> None: """ fzf prompt with list and print selected node(s) @node: node to start with @@ -550,24 +537,24 @@ class Noder: self.print_tree(rend, fmt=subfmt) @staticmethod - def to_dot(node: Node, + def to_dot(top: NodeTop, path: str = 'tree.dot') -> str: """export to dot for graphing""" - anytree.exporter.DotExporter(node).to_dotfile(path) + anytree.exporter.DotExporter(top).to_dotfile(path) Logger.info(f'dot file created under \"{path}\"') return f'dot {path} -T png -o /tmp/tree.png' ############################################################### # searching ############################################################### - def find_name(self, top: Node, + def find_name(self, top: NodeTop, key: str, script: bool = False, only_dir: bool = False, - startnode: Optional[Node] = None, + startnode: Optional[NodeAny] = None, parentfromtree: bool = False, fmt: str = 'native', - raw: bool = False) -> List[Node]: + raw: bool = False) -> List[NodeAny]: """ find files based on their names @top: top node @@ -583,7 +570,7 @@ class Noder: self._debug(f'searching for \"{key}\"') # search for nodes based on path - start: Optional[Node] = top + start: Optional[NodeAny] = top if startnode: start = self.get_node(top, startnode) filterfunc = self._callback_find_name(key, only_dir) @@ -594,7 +581,9 @@ class Noder: # compile found nodes paths = {} for item in found: - item = self._sanitize(item) + item.name = fix_badchars(item.name) + if hasattr(item, 'relpath'): + item.relpath = fix_badchars(item.relpath) if parentfromtree: paths[self._get_parents(item)] = item else: @@ -636,17 +625,17 @@ class Noder: def _callback_find_name(self, term: str, only_dir: bool) -> Any: """callback for finding files""" - def find_name(node: Node) -> bool: - if node.type == cnode.TYPE_STORAGE: + def find_name(node: NodeAny) -> bool: + if isinstance(node, cnode.NodeStorage): # ignore storage nodes return False - if node.type == cnode.TYPE_TOP: + if isinstance(node, cnode.NodeTop): # ignore top nodes return False - if node.type == cnode.TYPE_META: + if isinstance(node, cnode.NodeMeta): # ignore meta nodes return False - if only_dir and node.type != cnode.TYPE_DIR: + if only_dir and isinstance(node, cnode.NodeDir): # ignore non directory return False @@ -663,11 +652,11 @@ class Noder: ############################################################### # ls ############################################################### - def list(self, top: Node, + def list(self, top: NodeTop, path: str, rec: bool = False, fmt: str = 'native', - raw: bool = False) -> List[Node]: + raw: bool = False) -> List[NodeAny]: """ list nodes for "ls" @top: top node @@ -729,7 +718,7 @@ class Noder: # tree creation ############################################################### def _add_entry(self, name: str, - top: Node, + top: NodeTop, resolv: Any) -> None: """add an entry to the tree""" entries = name.rstrip(os.sep).split(os.sep) @@ -744,7 +733,7 @@ class Noder: except anytree.resolver.ChildResolverError: self.new_archive_node(nodename, name, top, top.name) - def list_to_tree(self, parent: Node, names: List[str]) -> None: + def list_to_tree(self, parent: NodeAny, names: List[str]) -> None: """convert list of files to a tree""" if not names: return @@ -757,23 +746,23 @@ class Noder: # diverse ############################################################### def _sort_tree(self, - items: List[Node]) -> List[Node]: + items: List[NodeAny]) -> List[NodeAny]: """sorting a list of items""" return sorted(items, key=self._sort, reverse=self.sortsize) - def _sort(self, lst: Node) -> Any: + def _sort(self, lst: NodeAny) -> Any: """sort a list""" if self.sortsize: return self._sort_size(lst) return self._sort_fs(lst) @staticmethod - def _sort_fs(node: Node) -> Tuple[str, str]: + def _sort_fs(node: NodeAny) -> Tuple[str, str]: """sorting nodes dir first and alpha""" return (node.type, node.name.lstrip('.').lower()) @staticmethod - def _sort_size(node: Node) -> float: + def _sort_size(node: NodeAny) -> float: """sorting nodes by size""" try: if not node.size: @@ -782,22 +771,22 @@ class Noder: except AttributeError: return 0 - def _get_storage(self, node: Node) -> Node: + def _get_storage(self, node: NodeAny) -> NodeStorage: """recursively traverse up to find storage""" - if node.type == cnode.TYPE_STORAGE: + if isinstance(node, cnode.NodeStorage): return node - return cast(Node, node.ancestors[1]) + return cast(NodeStorage, node.ancestors[1]) @staticmethod - def _has_attr(node: Node, attr: str) -> bool: + def _has_attr(node: NodeAny, attr: str) -> bool: """return True if node has attr as attribute""" return attr in node.__dict__.keys() - def _get_parents(self, node: Node) -> str: + def _get_parents(self, node: NodeAny) -> str: """get all parents recursively""" - if node.type == cnode.TYPE_STORAGE: + if isinstance(node, cnode.NodeStorage): return '' - if node.type == cnode.TYPE_TOP: + if isinstance(node, cnode.NodeTop): return '' parent = self._get_parents(node.parent) if parent: @@ -813,13 +802,6 @@ class Noder: Logger.err(str(exc)) return '' - @staticmethod - def _sanitize(node: Node) -> Node: - """sanitize node strings""" - node.name = fix_badchars(node.name) - node.relpath = fix_badchars(node.relpath) - return node - def _debug(self, string: str) -> None: """print debug""" if not self.debug: diff --git a/catcli/walker.py b/catcli/walker.py index e263fce..94fe17c 100644 --- a/catcli/walker.py +++ b/catcli/walker.py @@ -9,9 +9,9 @@ import os from typing import Tuple, Optional # local imports -from catcli.cnode import Node from catcli.noder import Noder from catcli.logger import Logger +from catcli.cnode import NodeAny, NodeTop class Walker: @@ -36,7 +36,7 @@ class Walker: self.lpath = logpath def index(self, path: str, - parent: Node, + parent: NodeAny, name: str, storagepath: str = '') -> Tuple[str, int]: """ @@ -89,15 +89,15 @@ class Walker: self._progress('') return parent, cnt - def reindex(self, path: str, parent: Node, top: Node) -> int: + def reindex(self, path: str, parent: NodeAny, top: NodeTop) -> int: """reindex a directory and store in tree""" cnt = self._reindex(path, parent, top) cnt += self.noder.clean_not_flagged(parent) return cnt def _reindex(self, path: str, - parent: Node, - top: Node, + parent: NodeAny, + top: NodeTop, storagepath: str = '') -> int: """ reindex a directory and store in tree @@ -148,9 +148,9 @@ class Walker: return cnt def _need_reindex(self, - top: Node, + top: NodeTop, path: str, - treepath: str) -> Tuple[bool, Optional[Node]]: + treepath: str) -> Tuple[bool, Optional[NodeTop]]: """ test if node needs re-indexing @top: top node (storage)