diff --git a/catcli/__init__.py b/catcli/__init__.py index 3205988..48ed909 100644 --- a/catcli/__init__.py +++ b/catcli/__init__.py @@ -7,7 +7,7 @@ Copyright (c) 2017, deadc0de6 import sys -def main(): +def main() -> None: """entry point""" import catcli.catcli if catcli.catcli.main(): diff --git a/catcli/catalog.py b/catcli/catalog.py index b5df73d..005d081 100644 --- a/catcli/catalog.py +++ b/catcli/catalog.py @@ -7,8 +7,9 @@ Class that represents the catcli catalog import os import pickle -from anytree.exporter import JsonExporter -from anytree.importer import JsonImporter +import anytree # type: ignore +from anytree.exporter import JsonExporter # type: ignore +from anytree.importer import JsonImporter # type: ignore # local imports from catcli.utils import ask @@ -18,7 +19,10 @@ from catcli.logger import Logger class Catalog: """the catalog""" - def __init__(self, path, usepickle=False, debug=False, force=False): + def __init__(self, path: str, + usepickle: bool = False, + debug: bool = False, + force: bool = False) -> None: """ @path: catalog path @usepickle: use pickle @@ -31,12 +35,13 @@ class Catalog: self.metanode = None self.pickle = usepickle - def set_metanode(self, metanode): + def set_metanode(self, metanode: anytree.AnyNode) -> None: """remove the metanode until tree is re-written""" self.metanode = metanode - self.metanode.parent = None + if self.metanode: + self.metanode.parent = None - def exists(self): + def exists(self) -> bool: """does catalog exist""" if not self.path: return False @@ -44,7 +49,7 @@ class Catalog: return True return False - def restore(self): + def restore(self) -> anytree.AnyNode: """restore the catalog""" if not self.path: return None @@ -56,7 +61,7 @@ class Catalog: content = file.read() return self._restore_json(content) - def save(self, node): + def save(self, node: anytree.AnyNode) -> bool: """save the catalog""" if not self.path: Logger.err('Path not defined') @@ -77,19 +82,19 @@ class Catalog: return self._save_pickle(node) return self._save_json(node) - def _debug(self, text): + def _debug(self, text: str) -> None: if not self.debug: return Logger.debug(text) - def _save_pickle(self, node): + def _save_pickle(self, node: anytree.AnyNode) -> bool: """pickle the catalog""" with open(self.path, 'wb') as file: pickle.dump(node, file) self._debug(f'Catalog saved to pickle \"{self.path}\"') return True - def _restore_pickle(self): + def _restore_pickle(self) -> anytree.AnyNode: """restore the pickled tree""" with open(self.path, 'rb') as file: root = pickle.load(file) @@ -97,7 +102,7 @@ class Catalog: self._debug(msg) return root - def _save_json(self, node): + def _save_json(self, node: anytree.AnyNode) -> bool: """export the catalog in json""" exp = JsonExporter(indent=2, sort_keys=True) with open(self.path, 'w', encoding='UTF-8') as file: @@ -105,7 +110,7 @@ class Catalog: self._debug(f'Catalog saved to json \"{self.path}\"') return True - def _restore_json(self, string): + def _restore_json(self, string: str) -> anytree.AnyNode: """restore the tree from json""" imp = JsonImporter() root = imp.import_(string) diff --git a/catcli/catcli.py b/catcli/catcli.py index 526d701..49f5e0a 100755 --- a/catcli/catcli.py +++ b/catcli/catcli.py @@ -11,6 +11,8 @@ Catcli command line interface import sys import os import datetime +from typing import Dict, Any, List +import anytree # type: ignore from docopt import docopt # local imports @@ -78,13 +80,20 @@ Options: """ # nopep8 -def cmd_mount(args, top, noder): +def cmd_mount(args: Dict[str, Any], + top: anytree.AnyNode, + noder: Noder) -> None: """mount action""" mountpoint = args[''] - Fuser(mountpoint, top, noder) + debug = args['--verbose'] + Fuser(mountpoint, top, noder, + debug=debug) -def cmd_index(args, noder, catalog, top): +def cmd_index(args: Dict[str, Any], + noder: Noder, + catalog: Catalog, + top: anytree.AnyNode) -> None: """index action""" path = args[''] name = args[''] @@ -119,7 +128,10 @@ def cmd_index(args, noder, catalog, top): catalog.save(top) -def cmd_update(args, noder, catalog, top): +def cmd_update(args: Dict[str, Any], + noder: Noder, + catalog: Catalog, + top: anytree.AnyNode) -> None: """update action""" path = args[''] name = args[''] @@ -147,7 +159,9 @@ def cmd_update(args, noder, catalog, top): catalog.save(top) -def cmd_ls(args, noder, top): +def cmd_ls(args: Dict[str, Any], + noder: Noder, + top: anytree.AnyNode) -> List[anytree.AnyNode]: """ls action""" path = args[''] if not path: @@ -168,7 +182,8 @@ def cmd_ls(args, noder, top): fmt = args['--format'] if fmt.startswith('fzf'): raise BadFormatException('fzf is not supported in ls, use find') - found = noder.list(top, path, + found = noder.list(top, + path, rec=args['--recursive'], fmt=fmt, raw=args['--raw-size']) @@ -178,7 +193,10 @@ def cmd_ls(args, noder, top): return found -def cmd_rm(args, noder, catalog, top): +def cmd_rm(args: Dict[str, Any], + noder: Noder, + catalog: Catalog, + top: anytree.AnyNode) -> anytree.AnyNode: """rm action""" name = args[''] node = noder.get_storage_node(top, name) @@ -191,7 +209,9 @@ def cmd_rm(args, noder, catalog, top): return top -def cmd_find(args, noder, top): +def cmd_find(args: Dict[str, Any], + noder: Noder, + top: anytree.AnyNode) -> List[anytree.AnyNode]: """find action""" fromtree = args['--parent'] directory = args['--directory'] @@ -200,12 +220,18 @@ def cmd_find(args, noder, top): raw = args['--raw-size'] script = args['--script'] search_for = args[''] - return noder.find_name(top, search_for, script=script, - startpath=startpath, only_dir=directory, - parentfromtree=fromtree, fmt=fmt, raw=raw) + found = noder.find_name(top, search_for, + script=script, + startnode=startpath, + only_dir=directory, + parentfromtree=fromtree, + fmt=fmt, raw=raw) + return found -def cmd_graph(args, noder, top): +def cmd_graph(args: Dict[str, Any], + noder: Noder, + top: anytree.AnyNode) -> None: """graph action""" path = args[''] if not path: @@ -214,7 +240,9 @@ def cmd_graph(args, noder, top): Logger.info(f'create graph with \"{cmd}\" (you need graphviz)') -def cmd_rename(args, catalog, top): +def cmd_rename(args: Dict[str, Any], + catalog: Catalog, + top: anytree.AnyNode) -> None: """rename action""" storage = args[''] new = args[''] @@ -227,10 +255,12 @@ def cmd_rename(args, catalog, top): Logger.info(msg) else: Logger.err(f'Storage named \"{storage}\" does not exist') - return top -def cmd_edit(args, noder, catalog, top): +def cmd_edit(args: Dict[str, Any], + noder: Noder, + catalog: Catalog, + top: anytree.AnyNode) -> None: """edit action""" storage = args[''] storages = list(x.name for x in top.children) @@ -245,16 +275,15 @@ def cmd_edit(args, noder, catalog, top): Logger.info(f'Storage \"{storage}\" edited') else: Logger.err(f'Storage named \"{storage}\" does not exist') - return top -def banner(): +def banner() -> None: """print banner""" Logger.stderr_nocolor(BANNER) Logger.stderr_nocolor("") -def print_supported_formats(): +def print_supported_formats() -> None: """print all supported formats to stdout""" print('"native" : native format') print('"csv" : CSV format') @@ -263,7 +292,7 @@ def print_supported_formats(): print('"fzf-csv" : fzf to csv (only for find)') -def main(): +def main() -> bool: """entry point""" args = docopt(USAGE, version=VERSION) diff --git a/catcli/colors.py b/catcli/colors.py index e53cd6c..07010c5 100644 --- a/catcli/colors.py +++ b/catcli/colors.py @@ -5,6 +5,11 @@ Copyright (c) 2022, deadc0de6 shell colors """ +from typing import TypeVar, Type + + +CLASSTYPE = TypeVar('CLASSTYPE', bound='Colors') + class Colors: """shell colors""" @@ -22,7 +27,7 @@ class Colors: UND = '\033[4m' @classmethod - def no_color(cls): + def no_color(cls: Type[CLASSTYPE]) -> None: """disable colors""" Colors.RED = '' Colors.GREEN = '' diff --git a/catcli/decomp.py b/catcli/decomp.py index 13f4ff4..079a428 100644 --- a/catcli/decomp.py +++ b/catcli/decomp.py @@ -8,12 +8,13 @@ Catcli generic compressed data lister import os import tarfile import zipfile +from typing import List class Decomp: """decompressor""" - def __init__(self): + def __init__(self) -> None: self.ext = { 'tar': self._tar, 'tgz': self._tar, @@ -28,29 +29,29 @@ class Decomp: 'tar.bz2': self._tar, 'zip': self._zip} - def get_formats(self): + def get_formats(self) -> List[str]: """return list of supported extensions""" return list(self.ext.keys()) - def get_names(self, path): + def get_names(self, path: str) -> List[str]: """get tree of compressed archive""" ext = os.path.splitext(path)[1][1:].lower() if ext in list(self.ext): return self.ext[ext](path) - return None + return [] @staticmethod - def _tar(path): + def _tar(path: str) -> List[str]: """return list of file names in tar""" if not tarfile.is_tarfile(path): - return None + return [] with tarfile.open(path, "r") as tar: return tar.getnames() @staticmethod - def _zip(path): + def _zip(path: str) -> List[str]: """return list of file names in zip""" if not zipfile.is_zipfile(path): - return None + return [] with zipfile.ZipFile(path) as file: return file.namelist() diff --git a/catcli/fuser.py b/catcli/fuser.py index 1c62760..0b3d011 100644 --- a/catcli/fuser.py +++ b/catcli/fuser.py @@ -9,7 +9,9 @@ import os import logging from time import time from stat import S_IFDIR, S_IFREG -import fuse +from typing import List, Dict, Any +import anytree # type: ignore +import fuse # type: ignore from .noder import Noder @@ -26,28 +28,33 @@ SEPARATOR = '/' class Fuser: - """fuser filesystem""" + """fuse filesystem mounter""" - def __init__(self, mountpoint, top, noder): + def __init__(self, mountpoint: str, + top: anytree.AnyNode, + noder: Noder, + debug: bool = False): """fuse filesystem""" filesystem = CatcliFilesystem(top, noder) fuse.FUSE(filesystem, mountpoint, - foreground=True, + foreground=debug, allow_other=True, nothreads=True, - debug=True) + debug=debug) -class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): +class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore """in-memory filesystem for catcli catalog""" - def __init__(self, top, noder): + def __init__(self, top: anytree.AnyNode, + noder: Noder): """init fuse filesystem""" self.top = top self.noder = noder - def _get_entry(self, path): + def _get_entry(self, path: str) -> anytree.AnyNode: + """return the node pointed by path""" pre = f'{SEPARATOR}{self.noder.NAME_TOP}' if not path.startswith(pre): path = pre + path @@ -57,9 +64,10 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): raw=True) if found: return found[0] - return [] + return None - def _get_entries(self, path): + def _get_entries(self, path: str) -> List[anytree.AnyNode]: + """return nodes pointed by path""" pre = f'{SEPARATOR}{self.noder.NAME_TOP}' if not path.startswith(pre): path = pre + path @@ -73,13 +81,13 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): raw=True) return found - def _getattr(self, path): + def _getattr(self, path: str) -> Dict[str, Any]: entry = self._get_entry(path) if not entry: - return None + return {} curt = time() - mode = S_IFREG + mode: Any = S_IFREG if entry.type == Noder.TYPE_ARC: mode = S_IFREG elif entry.type == Noder.TYPE_DIR: @@ -103,7 +111,7 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): 'st_gid': os.getgid(), } - def getattr(self, path, _fh=None): + def getattr(self, path: str, _fh: Any = None) -> Dict[str, Any]: """return attr of file pointed by path""" logger.info('getattr path: %s', path) @@ -124,7 +132,7 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): meta = self._getattr(path) return meta - def readdir(self, path, _fh): + def readdir(self, path: str, _fh: Any) -> List[str]: """read directory content""" logger.info('readdir path: %s', path) content = ['.', '..'] diff --git a/catcli/logger.py b/catcli/logger.py index 586b033..423a044 100644 --- a/catcli/logger.py +++ b/catcli/logger.py @@ -6,61 +6,75 @@ Logging helper """ import sys +from typing import TypeVar, Type # local imports from catcli.colors import Colors from catcli.utils import fix_badchars +CLASSTYPE = TypeVar('CLASSTYPE', bound='Logger') + + class Logger: """log to stdout/stderr""" @classmethod - def stdout_nocolor(cls, string): + def stdout_nocolor(cls: Type[CLASSTYPE], + string: str) -> None: """to stdout no color""" string = fix_badchars(string) sys.stdout.write(f'{string}\n') @classmethod - def stderr_nocolor(cls, string): + def stderr_nocolor(cls: Type[CLASSTYPE], + string: str) -> None: """to stderr no color""" string = fix_badchars(string) sys.stderr.write(f'{string}\n') @classmethod - def debug(cls, string): + def debug(cls: Type[CLASSTYPE], + string: str) -> None: """to stderr no color""" cls.stderr_nocolor(f'[DBG] {string}\n') @classmethod - def info(cls, string): + def info(cls: Type[CLASSTYPE], + string: str) -> None: """to stdout in color""" string = fix_badchars(string) out = f'{Colors.MAGENTA}{string}{Colors.RESET}' sys.stdout.write(f'{out}\n') @classmethod - def err(cls, string): + def err(cls: Type[CLASSTYPE], + string: str) -> None: """to stderr in RED""" string = fix_badchars(string) out = f'{Colors.RED}{string}{Colors.RESET}' sys.stderr.write(f'{out}\n') @classmethod - def progr(cls, string): + def progr(cls: Type[CLASSTYPE], + string: str) -> None: """print progress""" string = fix_badchars(string) sys.stderr.write(f'{string}\r') sys.stderr.flush() @classmethod - def get_bold_text(cls, string): + def get_bold_text(cls: Type[CLASSTYPE], + string: str) -> str: """make it bold""" string = fix_badchars(string) return f'{Colors.BOLD}{string}{Colors.RESET}' @classmethod - def log_to_file(cls, path, string, append=True): + def log_to_file(cls: Type[CLASSTYPE], + path: str, + string: str, + append: bool = True) -> None: """log to file""" string = fix_badchars(string) mode = 'w' diff --git a/catcli/nodeprinter.py b/catcli/nodeprinter.py index b43613b..26505e6 100644 --- a/catcli/nodeprinter.py +++ b/catcli/nodeprinter.py @@ -6,11 +6,15 @@ Class for printing nodes """ import sys +from typing import TypeVar, Type, Optional, Tuple, List from catcli.colors import Colors from catcli.utils import fix_badchars +CLASSTYPE = TypeVar('CLASSTYPE', bound='NodePrinter') + + class NodePrinter: """a node printer class""" @@ -19,7 +23,9 @@ class NodePrinter: NBFILES = 'nbfiles' @classmethod - def print_storage_native(cls, pre, name, args, attr): + def print_storage_native(cls: Type[CLASSTYPE], pre: str, + name: str, args: str, + attr: str) -> None: """print a storage node""" end = '' if attr: @@ -31,7 +37,8 @@ class NodePrinter: sys.stdout.write(f'{out}\n') @classmethod - def print_file_native(cls, pre, name, attr): + def print_file_native(cls: Type[CLASSTYPE], pre: str, + name: str, attr: str) -> None: """print a file node""" nobad = fix_badchars(name) out = f'{pre}{nobad}' @@ -39,22 +46,26 @@ class NodePrinter: sys.stdout.write(f'{out}\n') @classmethod - def print_dir_native(cls, pre, name, depth='', attr=None): + def print_dir_native(cls: Type[CLASSTYPE], pre: str, + name: str, + depth: int = 0, + attr: Optional[List[Tuple[str, str]]] = None) -> None: """print a directory node""" end = [] - if depth != '': + if depth > 0: end.append(f'{cls.NBFILES}:{depth}') if attr: end.append(' '.join([f'{x}:{y}' for x, y in attr])) + end_string = '' if end: - endstring = ', '.join(end) - end = f' [{endstring}]' + end_string = f' [{", ".join(end)}]' out = pre + Colors.BLUE + fix_badchars(name) + Colors.RESET - out += f'{Colors.GRAY}{end}{Colors.RESET}' + out += f'{Colors.GRAY}{end_string}{Colors.RESET}' sys.stdout.write(f'{out}\n') @classmethod - def print_archive_native(cls, pre, name, archive): + def print_archive_native(cls: Type[CLASSTYPE], pre: str, + name: str, archive: str) -> None: """archive to stdout""" out = pre + Colors.YELLOW + fix_badchars(name) + Colors.RESET out += f' {Colors.GRAY}[{cls.ARCHIVE}:{archive}]{Colors.RESET}' diff --git a/catcli/noder.py b/catcli/noder.py index 7ad39bd..8624e8b 100644 --- a/catcli/noder.py +++ b/catcli/noder.py @@ -2,14 +2,15 @@ author: deadc0de6 (https://github.com/deadc0de6) Copyright (c) 2017, deadc0de6 -Class that represents a node in the catalog tree +Class that process nodes in the catalog tree """ import os import shutil import time -import anytree -from pyfzf.pyfzf import FzfPrompt +from typing import List, Union, Tuple, Any, Optional, Dict +import anytree # type: ignore +from pyfzf.pyfzf import FzfPrompt # type: ignore # local imports from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars @@ -44,7 +45,9 @@ class Noder: 'maccess,md5,nbfiles,free_space,' 'total_space,meta') - def __init__(self, debug=False, sortsize=False, arc=False): + def __init__(self, debug: bool = False, + sortsize: bool = False, + arc: bool = False) -> None: """ @debug: debug mode @sortsize: sort nodes by size @@ -58,11 +61,12 @@ class Noder: self.decomp = Decomp() @staticmethod - def get_storage_names(top): + def get_storage_names(top: anytree.AnyNode) -> List[str]: """return a list of all storage names""" return [x.name for x in list(top.children)] - def get_storage_node(self, top, name, path=None): + def get_storage_node(self, top: anytree.AnyNode, + name: str, path: str = '') -> anytree.AnyNode: """ return the storage node if any if path is submitted, it will update the media info @@ -81,7 +85,8 @@ class Noder: return found @staticmethod - def get_node(top, path, quiet=False): + def get_node(top: str, path: str, + quiet: bool = False) -> anytree.AnyNode: """get the node by internal tree path""" resolv = anytree.resolver.Resolver('name') try: @@ -92,7 +97,10 @@ class Noder: Logger.err(f'No node at path \"{bpath}\"') return None - def get_node_if_changed(self, top, path, treepath): + def get_node_if_changed(self, + top: anytree.AnyNode, + path: str, + treepath: str) -> Tuple[anytree.AnyNode, bool]: """ return the node (if any) and if it has changed @top: top node (storage) @@ -128,17 +136,18 @@ class Noder: self._debug(f'\tchange: no change for \"{path}\"') return node, False - def rec_size(self, node, store=True): + def rec_size(self, node: anytree.AnyNode, + store: bool = True) -> float: """ recursively traverse tree and return size @store: store the size in the node """ if node.type == self.TYPE_FILE: self._debug(f'getting node size for \"{node.name}\"') - return node.size + return float(node.size) msg = f'getting node size recursively for \"{node.name}\"' self._debug(msg) - size = 0 + size: float = 0 for i in node.children: if node.type == self.TYPE_DIR: size = self.rec_size(i, store=store) @@ -160,7 +169,7 @@ class Noder: # public helpers ############################################################### @staticmethod - def format_storage_attr(attr): + def format_storage_attr(attr: Union[str, List[str]]) -> str: """format the storage attr for saving""" if not attr: return '' @@ -169,18 +178,19 @@ class Noder: attr = attr.rstrip() return attr - def set_hashing(self, val): + def set_hashing(self, val: bool) -> None: """hash files when indexing""" self.hash = val ############################################################### # node creation ############################################################### - def new_top_node(self): + def new_top_node(self) -> anytree.AnyNode: """create a new top node""" return anytree.AnyNode(name=self.NAME_TOP, type=self.TYPE_TOP) - def new_file_node(self, name, path, parent, storagepath): + def new_file_node(self, name: str, path: str, + parent: str, storagepath: str) -> anytree.AnyNode: """create a new node representing a file""" if not os.path.exists(path): Logger.err(f'File \"{path}\" does not exist') @@ -191,14 +201,16 @@ class Noder: except OSError as exc: Logger.err(f'OSError: {exc}') return None - md5 = None + md5 = '' if self.hash: md5 = self._get_hash(path) relpath = os.sep.join([storagepath, name]) maccess = os.path.getmtime(path) - node = self._new_generic_node(name, self.TYPE_FILE, relpath, parent, - size=stat.st_size, md5=md5, + node = self._new_generic_node(name, self.TYPE_FILE, + relpath, parent, + size=stat.st_size, + md5=md5, maccess=maccess) if self.arc: ext = os.path.splitext(path)[1][1:] @@ -210,7 +222,8 @@ class Noder: self._debug(f'{path} is NOT an archive') return node - def new_dir_node(self, name, path, parent, storagepath): + def new_dir_node(self, name: str, path: str, + parent: str, storagepath: str) -> anytree.AnyNode: """create a new node representing a directory""" path = os.path.abspath(path) relpath = os.sep.join([storagepath, name]) @@ -218,7 +231,10 @@ class Noder: return self._new_generic_node(name, self.TYPE_DIR, relpath, parent, maccess=maccess) - def new_storage_node(self, name, path, parent, attr=None): + def new_storage_node(self, name: str, + path: str, + parent: str, + attr: Optional[str] = None) -> anytree.AnyNode: """create a new node representing a storage""" path = os.path.abspath(path) free = shutil.disk_usage(path).free @@ -227,15 +243,19 @@ class Noder: return anytree.AnyNode(name=name, type=self.TYPE_STORAGE, free=free, total=total, parent=parent, attr=attr, ts=epoch) - def new_archive_node(self, name, path, parent, archive): + def new_archive_node(self, name: str, path: str, + parent: str, archive: str) -> anytree.AnyNode: """create a new node for archive data""" return anytree.AnyNode(name=name, type=self.TYPE_ARC, relpath=path, parent=parent, size=0, md5=None, archive=archive) @staticmethod - def _new_generic_node(name, nodetype, relpath, parent, - size=None, md5=None, maccess=None): + def _new_generic_node(name: str, nodetype: str, + relpath: str, parent: str, + size: float = 0, + md5: str = '', + maccess: float = 0) -> anytree.AnyNode: """generic node creation""" return anytree.AnyNode(name=name, type=nodetype, relpath=relpath, parent=parent, size=size, @@ -244,21 +264,22 @@ class Noder: ############################################################### # node management ############################################################### - def update_metanode(self, top): + def update_metanode(self, top: anytree.AnyNode) -> anytree.AnyNode: """create or update meta node information""" meta = self._get_meta_node(top) epoch = int(time.time()) if not meta: - attr = {} + attr: Dict[str, Any] = {} attr['created'] = epoch attr['created_version'] = VERSION - meta = anytree.AnyNode(name=self.NAME_META, type=self.TYPE_META, + meta = anytree.AnyNode(name=self.NAME_META, + type=self.TYPE_META, attr=attr) meta.attr['access'] = epoch meta.attr['access_version'] = VERSION return meta - def _get_meta_node(self, top): + def _get_meta_node(self, top: anytree.AnyNode) -> anytree.AnyNode: """return the meta node if any""" try: return next(filter(lambda x: x.type == self.TYPE_META, @@ -266,7 +287,7 @@ class Noder: except StopIteration: return None - def clean_not_flagged(self, top): + def clean_not_flagged(self, top: anytree.AnyNode) -> int: """remove any node not flagged and clean flags""" cnt = 0 for node in anytree.PreOrderIter(top): @@ -277,11 +298,11 @@ class Noder: return cnt @staticmethod - def flag(node): + def flag(node: anytree.AnyNode) -> None: """flag a node""" node.flag = True - def _clean(self, node): + def _clean(self, node: anytree.AnyNode) -> bool: """remove node if not flagged""" if not self._has_attr(node, 'flag') or \ not node.flag: @@ -293,7 +314,9 @@ class Noder: ############################################################### # printing ############################################################### - def _node_to_csv(self, node, sep=',', raw=False): + def _node_to_csv(self, node: anytree.AnyNode, + sep: str = ',', + raw: bool = False) -> None: """ print a node to csv @node: the node to consider @@ -353,9 +376,13 @@ class Noder: if len(line) > 0: Logger.stdout_nocolor(line) - def _print_node_native(self, node, pre='', withpath=False, - withdepth=False, withstorage=False, - recalcparent=False, raw=False): + def _print_node_native(self, node: anytree.AnyNode, + pre: str = '', + withpath: bool = False, + withdepth: bool = False, + withstorage: bool = False, + recalcparent: bool = False, + raw: bool = False) -> None: """ print a node @node: the node to print @@ -380,11 +407,11 @@ class Noder: name = name.lstrip(os.sep) if withstorage: storage = self._get_storage(node) - attr = '' + attr_str = '' if node.md5: - attr = f', md5:{node.md5}' + attr_str = f', md5:{node.md5}' size = size_to_str(node.size, raw=raw) - compl = f'size:{size}{attr}' + compl = f'size:{size}{attr_str}' if withstorage: content = Logger.get_bold_text(storage.name) compl += f', storage:{content}' @@ -398,16 +425,16 @@ class Noder: else: name = node.relpath name = name.lstrip(os.sep) - depth = '' + depth = 0 if withdepth: depth = len(node.children) if withstorage: storage = self._get_storage(node) - attr = [] + attr: List[Tuple[str, str]] = [] if node.size: - attr.append(['totsize', size_to_str(node.size, raw=raw)]) + attr.append(('totsize', size_to_str(node.size, raw=raw))) if withstorage: - attr.append(['storage', Logger.get_bold_text(storage.name)]) + attr.append(('storage', Logger.get_bold_text(storage.name))) NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr) elif node.type == self.TYPE_STORAGE: # node of type storage @@ -423,9 +450,9 @@ class Noder: timestamp += epoch_to_str(node.ts) disksize = '' # the children size - size = self.rec_size(node, store=False) - size = size_to_str(size, raw=raw) - disksize = 'totsize:' + f'{size}' + recsize = self.rec_size(node, store=False) + sizestr = size_to_str(recsize, raw=raw) + disksize = 'totsize:' + f'{sizestr}' # format the output name = node.name args = [ @@ -446,9 +473,9 @@ class Noder: else: Logger.err(f'bad node encountered: {node}') - def print_tree(self, node, - fmt='native', - raw=False): + def print_tree(self, node: anytree.AnyNode, + fmt: str = 'native', + raw: bool = False) -> None: """ print the tree in different format @node: start node @@ -470,20 +497,21 @@ class Noder: Logger.stdout_nocolor(self.CSV_HEADER) self._to_csv(node, raw=raw) - def _to_csv(self, node, raw=False): + def _to_csv(self, node: anytree.AnyNode, + raw: bool = False) -> None: """print the tree to csv""" rend = anytree.RenderTree(node, childiter=self._sort_tree) for _, _, item in rend: self._node_to_csv(item, raw=raw) @staticmethod - def _fzf_prompt(strings): + def _fzf_prompt(strings: Any) -> Any: # prompt with fzf fzf = FzfPrompt() selected = fzf.prompt(strings) return selected - def _to_fzf(self, node, fmt): + def _to_fzf(self, node: anytree.AnyNode, fmt: str) -> None: """ fzf prompt with list and print selected node(s) @node: node to start with @@ -512,7 +540,8 @@ class Noder: self.print_tree(rend, fmt=subfmt) @staticmethod - def to_dot(node, path='tree.dot'): + def to_dot(node: anytree.AnyNode, + path: str = 'tree.dot') -> str: """export to dot for graphing""" anytree.exporter.DotExporter(node).to_dotfile(path) Logger.info(f'dot file created under \"{path}\"') @@ -521,10 +550,14 @@ class Noder: ############################################################### # searching ############################################################### - def find_name(self, top, key, - script=False, only_dir=False, - startpath=None, parentfromtree=False, - fmt='native', raw=False): + def find_name(self, top: anytree.AnyNode, + key: str, + script: bool = False, + only_dir: bool = False, + startnode: anytree.AnyNode = None, + parentfromtree: bool = False, + fmt: str = 'native', + raw: bool = False) -> List[anytree.AnyNode]: """ find files based on their names @top: top node @@ -541,8 +574,8 @@ class Noder: # search for nodes based on path start = top - if startpath: - start = self.get_node(top, startpath) + if startnode: + start = self.get_node(top, startnode) filterfunc = self._callback_find_name(key, only_dir) found = anytree.findall(start, filter_=filterfunc) nbfound = len(found) @@ -591,9 +624,9 @@ class Noder: return list(paths.values()) - def _callback_find_name(self, term, only_dir): + def _callback_find_name(self, term: str, only_dir: bool) -> Any: """callback for finding files""" - def find_name(node): + def find_name(node: anytree.AnyNode) -> bool: if node.type == self.TYPE_STORAGE: # ignore storage nodes return False @@ -620,10 +653,11 @@ class Noder: ############################################################### # ls ############################################################### - def list(self, top, path, - rec=False, - fmt='native', - raw=False): + def list(self, top: anytree.AnyNode, + path: str, + rec: bool = False, + fmt: str = 'native', + raw: bool = False) -> List[anytree.AnyNode]: """ list nodes for "ls" @top: top node @@ -684,7 +718,9 @@ class Noder: ############################################################### # tree creation ############################################################### - def _add_entry(self, name, top, resolv): + def _add_entry(self, name: str, + top: anytree.AnyNode, + resolv: Any) -> None: """add an entry to the tree""" entries = name.rstrip(os.sep).split(os.sep) if len(entries) == 1: @@ -698,7 +734,7 @@ class Noder: except anytree.resolver.ChildResolverError: self.new_archive_node(nodename, name, top, top.name) - def list_to_tree(self, parent, names): + def list_to_tree(self, parent: anytree.AnyNode, names: List[str]) -> None: """convert list of files to a tree""" if not names: return @@ -710,43 +746,44 @@ class Noder: ############################################################### # diverse ############################################################### - def _sort_tree(self, items): + def _sort_tree(self, + items: List[anytree.AnyNode]) -> List[anytree.AnyNode]: """sorting a list of items""" return sorted(items, key=self._sort, reverse=self.sortsize) - def _sort(self, lst): + def _sort(self, lst: List[anytree.AnyNode]) -> Any: """sort a list""" if self.sortsize: return self._sort_size(lst) return self._sort_fs(lst) @staticmethod - def _sort_fs(node): + def _sort_fs(node: anytree.AnyNode) -> Tuple[str, str]: """sorting nodes dir first and alpha""" return (node.type, node.name.lstrip('.').lower()) @staticmethod - def _sort_size(node): + def _sort_size(node: anytree.AnyNode) -> float: """sorting nodes by size""" try: if not node.size: return 0 - return node.size + return float(node.size) except AttributeError: return 0 - def _get_storage(self, node): + def _get_storage(self, node: anytree.AnyNode) -> anytree.AnyNode: """recursively traverse up to find storage""" if node.type == self.TYPE_STORAGE: return node return node.ancestors[1] @staticmethod - def _has_attr(node, attr): + def _has_attr(node: anytree.AnyNode, attr: str) -> bool: """return True if node has attr as attribute""" return attr in node.__dict__.keys() - def _get_parents(self, node): + def _get_parents(self, node: anytree.AnyNode) -> str: """get all parents recursively""" if node.type == self.TYPE_STORAGE: return '' @@ -755,25 +792,25 @@ class Noder: parent = self._get_parents(node.parent) if parent: return os.sep.join([parent, node.name]) - return node.name + return str(node.name) @staticmethod - def _get_hash(path): + def _get_hash(path: str) -> str: """return md5 hash of node""" try: return md5sum(path) except CatcliException as exc: Logger.err(str(exc)) - return None + return '' @staticmethod - def _sanitize(node): + def _sanitize(node: anytree.AnyNode) -> anytree.AnyNode: """sanitize node strings""" node.name = fix_badchars(node.name) node.relpath = fix_badchars(node.relpath) return node - def _debug(self, string): + def _debug(self, string: str) -> None: """print debug""" if not self.debug: return diff --git a/catcli/utils.py b/catcli/utils.py index 1180455..38a257e 100644 --- a/catcli/utils.py +++ b/catcli/utils.py @@ -18,7 +18,7 @@ from catcli.exceptions import CatcliException SEPARATOR = '/' -def md5sum(path): +def md5sum(path: str) -> str: """ calculate md5 sum of a file may raise exception @@ -39,10 +39,11 @@ def md5sum(path): pass except OSError as exc: raise CatcliException(f'md5sum error: {exc}') from exc - return None + return '' -def size_to_str(size, raw=True): +def size_to_str(size: float, + raw: bool = True) -> str: """convert size to string, optionally human readable""" div = 1024. suf = ['B', 'K', 'M', 'G', 'T', 'P'] @@ -56,7 +57,7 @@ def size_to_str(size, raw=True): return f'{size:.1f}{sufix}' -def epoch_to_str(epoch): +def epoch_to_str(epoch: int) -> str: """convert epoch to string""" if not epoch: return '' @@ -65,18 +66,18 @@ def epoch_to_str(epoch): return timestamp.strftime(fmt) -def ask(question): +def ask(question: str) -> bool: """ask the user what to do""" resp = input(f'{question} [y|N] ? ') return resp.lower() == 'y' -def edit(string): +def edit(string: str) -> str: """edit the information with the default EDITOR""" - string = string.encode('utf-8') + data = string.encode('utf-8') editor = os.environ.get('EDITOR', 'vim') with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as file: - file.write(string) + file.write(data) file.flush() subprocess.call([editor, file.name]) file.seek(0) @@ -84,6 +85,6 @@ def edit(string): return new.decode('utf-8') -def fix_badchars(string): +def fix_badchars(string: str) -> str: """fix none utf-8 chars in string""" return string.encode('utf-8', 'ignore').decode('utf-8') diff --git a/catcli/walker.py b/catcli/walker.py index 4740faf..4d9f79d 100644 --- a/catcli/walker.py +++ b/catcli/walker.py @@ -6,8 +6,11 @@ Catcli filesystem indexer """ import os +from typing import Tuple +import anytree # type: ignore # local imports +from catcli.noder import Noder from catcli.logger import Logger @@ -16,8 +19,10 @@ class Walker: MAXLINELEN = 80 - 15 - def __init__(self, noder, usehash=True, debug=False, - logpath=None): + def __init__(self, noder: Noder, + usehash: bool = True, + debug: bool = False, + logpath: str = ''): """ @noder: the noder to use @hash: calculate hash of nodes @@ -30,7 +35,10 @@ class Walker: self.debug = debug self.lpath = logpath - def index(self, path, parent, name, storagepath=''): + def index(self, path: str, + parent: str, + name: str, + storagepath: str = '') -> Tuple[str, int]: """ index a directory and store in tree @path: path to index @@ -39,7 +47,8 @@ class Walker: """ self._debug(f'indexing starting at {path}') if not parent: - parent = self.noder.new_dir_node(name, path, parent) + parent = self.noder.new_dir_node(name, path, + parent, storagepath) if os.path.islink(path): rel = os.readlink(path) @@ -77,16 +86,19 @@ class Walker: _, cnt2 = self.index(sub, dummy, base, nstoragepath) cnt += cnt2 break - self._progress(None) + self._progress('') return parent, cnt - def reindex(self, path, parent, top): + def reindex(self, path: str, parent: str, top: str) -> int: """reindex a directory and store in tree""" cnt = self._reindex(path, parent, top) cnt += self.noder.clean_not_flagged(parent) return cnt - def _reindex(self, path, parent, top, storagepath=''): + def _reindex(self, path: str, + parent: str, + top: anytree.AnyNode, + storagepath: str = '') -> int: """ reindex a directory and store in tree @path: directory path to re-index @@ -131,7 +143,10 @@ class Walker: break return cnt - def _need_reindex(self, top, path, treepath): + def _need_reindex(self, + top: anytree.AnyNode, + path: str, + treepath: str) -> Tuple[bool, anytree.AnyNode]: """ test if node needs re-indexing @top: top node (storage) @@ -153,13 +168,13 @@ class Walker: cnode.parent = None return True, cnode - def _debug(self, string): + def _debug(self, string: str) -> None: """print to debug""" if not self.debug: return Logger.debug(string) - def _progress(self, string): + def _progress(self, string: str) -> None: """print progress""" if self.debug: return @@ -171,7 +186,7 @@ class Walker: string = string[:self.MAXLINELEN] + '...' Logger.progr(f'indexing: {string:80}') - def _log2file(self, string): + def _log2file(self, string: str) -> None: """log to file""" if not self.lpath: return