pull/30/head
deadc0de6 1 year ago
parent 047c9bf4ab
commit 4a9e565e74

@ -7,7 +7,7 @@ Copyright (c) 2017, deadc0de6
import sys
def main():
def main() -> None:
"""entry point"""
import catcli.catcli
if catcli.catcli.main():

@ -7,8 +7,9 @@ Class that represents the catcli catalog
import os
import pickle
from anytree.exporter import JsonExporter
from anytree.importer import JsonImporter
import anytree # type: ignore
from anytree.exporter import JsonExporter # type: ignore
from anytree.importer import JsonImporter # type: ignore
# local imports
from catcli.utils import ask
@ -18,7 +19,10 @@ from catcli.logger import Logger
class Catalog:
"""the catalog"""
def __init__(self, path, usepickle=False, debug=False, force=False):
def __init__(self, path: str,
usepickle: bool = False,
debug: bool = False,
force: bool = False) -> None:
"""
@path: catalog path
@usepickle: use pickle
@ -31,12 +35,13 @@ class Catalog:
self.metanode = None
self.pickle = usepickle
def set_metanode(self, metanode):
def set_metanode(self, metanode: anytree.AnyNode) -> None:
"""remove the metanode until tree is re-written"""
self.metanode = metanode
self.metanode.parent = None
if self.metanode:
self.metanode.parent = None
def exists(self):
def exists(self) -> bool:
"""does catalog exist"""
if not self.path:
return False
@ -44,7 +49,7 @@ class Catalog:
return True
return False
def restore(self):
def restore(self) -> anytree.AnyNode:
"""restore the catalog"""
if not self.path:
return None
@ -56,7 +61,7 @@ class Catalog:
content = file.read()
return self._restore_json(content)
def save(self, node):
def save(self, node: anytree.AnyNode) -> bool:
"""save the catalog"""
if not self.path:
Logger.err('Path not defined')
@ -77,19 +82,19 @@ class Catalog:
return self._save_pickle(node)
return self._save_json(node)
def _debug(self, text):
def _debug(self, text: str) -> None:
if not self.debug:
return
Logger.debug(text)
def _save_pickle(self, node):
def _save_pickle(self, node: anytree.AnyNode) -> bool:
"""pickle the catalog"""
with open(self.path, 'wb') as file:
pickle.dump(node, file)
self._debug(f'Catalog saved to pickle \"{self.path}\"')
return True
def _restore_pickle(self):
def _restore_pickle(self) -> anytree.AnyNode:
"""restore the pickled tree"""
with open(self.path, 'rb') as file:
root = pickle.load(file)
@ -97,7 +102,7 @@ class Catalog:
self._debug(msg)
return root
def _save_json(self, node):
def _save_json(self, node: anytree.AnyNode) -> bool:
"""export the catalog in json"""
exp = JsonExporter(indent=2, sort_keys=True)
with open(self.path, 'w', encoding='UTF-8') as file:
@ -105,7 +110,7 @@ class Catalog:
self._debug(f'Catalog saved to json \"{self.path}\"')
return True
def _restore_json(self, string):
def _restore_json(self, string: str) -> anytree.AnyNode:
"""restore the tree from json"""
imp = JsonImporter()
root = imp.import_(string)

@ -11,6 +11,8 @@ Catcli command line interface
import sys
import os
import datetime
from typing import Dict, Any, List
import anytree # type: ignore
from docopt import docopt
# local imports
@ -78,13 +80,20 @@ Options:
""" # nopep8
def cmd_mount(args, top, noder):
def cmd_mount(args: Dict[str, Any],
top: anytree.AnyNode,
noder: Noder) -> None:
"""mount action"""
mountpoint = args['<mountpoint>']
Fuser(mountpoint, top, noder)
debug = args['--verbose']
Fuser(mountpoint, top, noder,
debug=debug)
def cmd_index(args, noder, catalog, top):
def cmd_index(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: anytree.AnyNode) -> None:
"""index action"""
path = args['<path>']
name = args['<name>']
@ -119,7 +128,10 @@ def cmd_index(args, noder, catalog, top):
catalog.save(top)
def cmd_update(args, noder, catalog, top):
def cmd_update(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: anytree.AnyNode) -> None:
"""update action"""
path = args['<path>']
name = args['<name>']
@ -147,7 +159,9 @@ def cmd_update(args, noder, catalog, top):
catalog.save(top)
def cmd_ls(args, noder, top):
def cmd_ls(args: Dict[str, Any],
noder: Noder,
top: anytree.AnyNode) -> List[anytree.AnyNode]:
"""ls action"""
path = args['<path>']
if not path:
@ -168,7 +182,8 @@ def cmd_ls(args, noder, top):
fmt = args['--format']
if fmt.startswith('fzf'):
raise BadFormatException('fzf is not supported in ls, use find')
found = noder.list(top, path,
found = noder.list(top,
path,
rec=args['--recursive'],
fmt=fmt,
raw=args['--raw-size'])
@ -178,7 +193,10 @@ def cmd_ls(args, noder, top):
return found
def cmd_rm(args, noder, catalog, top):
def cmd_rm(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: anytree.AnyNode) -> anytree.AnyNode:
"""rm action"""
name = args['<storage>']
node = noder.get_storage_node(top, name)
@ -191,7 +209,9 @@ def cmd_rm(args, noder, catalog, top):
return top
def cmd_find(args, noder, top):
def cmd_find(args: Dict[str, Any],
noder: Noder,
top: anytree.AnyNode) -> List[anytree.AnyNode]:
"""find action"""
fromtree = args['--parent']
directory = args['--directory']
@ -200,12 +220,18 @@ def cmd_find(args, noder, top):
raw = args['--raw-size']
script = args['--script']
search_for = args['<term>']
return noder.find_name(top, search_for, script=script,
startpath=startpath, only_dir=directory,
parentfromtree=fromtree, fmt=fmt, raw=raw)
found = noder.find_name(top, search_for,
script=script,
startnode=startpath,
only_dir=directory,
parentfromtree=fromtree,
fmt=fmt, raw=raw)
return found
def cmd_graph(args, noder, top):
def cmd_graph(args: Dict[str, Any],
noder: Noder,
top: anytree.AnyNode) -> None:
"""graph action"""
path = args['<path>']
if not path:
@ -214,7 +240,9 @@ def cmd_graph(args, noder, top):
Logger.info(f'create graph with \"{cmd}\" (you need graphviz)')
def cmd_rename(args, catalog, top):
def cmd_rename(args: Dict[str, Any],
catalog: Catalog,
top: anytree.AnyNode) -> None:
"""rename action"""
storage = args['<storage>']
new = args['<name>']
@ -227,10 +255,12 @@ def cmd_rename(args, catalog, top):
Logger.info(msg)
else:
Logger.err(f'Storage named \"{storage}\" does not exist')
return top
def cmd_edit(args, noder, catalog, top):
def cmd_edit(args: Dict[str, Any],
noder: Noder,
catalog: Catalog,
top: anytree.AnyNode) -> None:
"""edit action"""
storage = args['<storage>']
storages = list(x.name for x in top.children)
@ -245,16 +275,15 @@ def cmd_edit(args, noder, catalog, top):
Logger.info(f'Storage \"{storage}\" edited')
else:
Logger.err(f'Storage named \"{storage}\" does not exist')
return top
def banner():
def banner() -> None:
"""print banner"""
Logger.stderr_nocolor(BANNER)
Logger.stderr_nocolor("")
def print_supported_formats():
def print_supported_formats() -> None:
"""print all supported formats to stdout"""
print('"native" : native format')
print('"csv" : CSV format')
@ -263,7 +292,7 @@ def print_supported_formats():
print('"fzf-csv" : fzf to csv (only for find)')
def main():
def main() -> bool:
"""entry point"""
args = docopt(USAGE, version=VERSION)

@ -5,6 +5,11 @@ Copyright (c) 2022, deadc0de6
shell colors
"""
from typing import TypeVar, Type
CLASSTYPE = TypeVar('CLASSTYPE', bound='Colors')
class Colors:
"""shell colors"""
@ -22,7 +27,7 @@ class Colors:
UND = '\033[4m'
@classmethod
def no_color(cls):
def no_color(cls: Type[CLASSTYPE]) -> None:
"""disable colors"""
Colors.RED = ''
Colors.GREEN = ''

@ -8,12 +8,13 @@ Catcli generic compressed data lister
import os
import tarfile
import zipfile
from typing import List
class Decomp:
"""decompressor"""
def __init__(self):
def __init__(self) -> None:
self.ext = {
'tar': self._tar,
'tgz': self._tar,
@ -28,29 +29,29 @@ class Decomp:
'tar.bz2': self._tar,
'zip': self._zip}
def get_formats(self):
def get_formats(self) -> List[str]:
"""return list of supported extensions"""
return list(self.ext.keys())
def get_names(self, path):
def get_names(self, path: str) -> List[str]:
"""get tree of compressed archive"""
ext = os.path.splitext(path)[1][1:].lower()
if ext in list(self.ext):
return self.ext[ext](path)
return None
return []
@staticmethod
def _tar(path):
def _tar(path: str) -> List[str]:
"""return list of file names in tar"""
if not tarfile.is_tarfile(path):
return None
return []
with tarfile.open(path, "r") as tar:
return tar.getnames()
@staticmethod
def _zip(path):
def _zip(path: str) -> List[str]:
"""return list of file names in zip"""
if not zipfile.is_zipfile(path):
return None
return []
with zipfile.ZipFile(path) as file:
return file.namelist()

@ -9,7 +9,9 @@ import os
import logging
from time import time
from stat import S_IFDIR, S_IFREG
import fuse
from typing import List, Dict, Any
import anytree # type: ignore
import fuse # type: ignore
from .noder import Noder
@ -26,28 +28,33 @@ SEPARATOR = '/'
class Fuser:
"""fuser filesystem"""
"""fuse filesystem mounter"""
def __init__(self, mountpoint, top, noder):
def __init__(self, mountpoint: str,
top: anytree.AnyNode,
noder: Noder,
debug: bool = False):
"""fuse filesystem"""
filesystem = CatcliFilesystem(top, noder)
fuse.FUSE(filesystem,
mountpoint,
foreground=True,
foreground=debug,
allow_other=True,
nothreads=True,
debug=True)
debug=debug)
class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations):
class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations): # type: ignore
"""in-memory filesystem for catcli catalog"""
def __init__(self, top, noder):
def __init__(self, top: anytree.AnyNode,
noder: Noder):
"""init fuse filesystem"""
self.top = top
self.noder = noder
def _get_entry(self, path):
def _get_entry(self, path: str) -> anytree.AnyNode:
"""return the node pointed by path"""
pre = f'{SEPARATOR}{self.noder.NAME_TOP}'
if not path.startswith(pre):
path = pre + path
@ -57,9 +64,10 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations):
raw=True)
if found:
return found[0]
return []
return None
def _get_entries(self, path):
def _get_entries(self, path: str) -> List[anytree.AnyNode]:
"""return nodes pointed by path"""
pre = f'{SEPARATOR}{self.noder.NAME_TOP}'
if not path.startswith(pre):
path = pre + path
@ -73,13 +81,13 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations):
raw=True)
return found
def _getattr(self, path):
def _getattr(self, path: str) -> Dict[str, Any]:
entry = self._get_entry(path)
if not entry:
return None
return {}
curt = time()
mode = S_IFREG
mode: Any = S_IFREG
if entry.type == Noder.TYPE_ARC:
mode = S_IFREG
elif entry.type == Noder.TYPE_DIR:
@ -103,7 +111,7 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations):
'st_gid': os.getgid(),
}
def getattr(self, path, _fh=None):
def getattr(self, path: str, _fh: Any = None) -> Dict[str, Any]:
"""return attr of file pointed by path"""
logger.info('getattr path: %s', path)
@ -124,7 +132,7 @@ class CatcliFilesystem(fuse.LoggingMixIn, fuse.Operations):
meta = self._getattr(path)
return meta
def readdir(self, path, _fh):
def readdir(self, path: str, _fh: Any) -> List[str]:
"""read directory content"""
logger.info('readdir path: %s', path)
content = ['.', '..']

@ -6,61 +6,75 @@ Logging helper
"""
import sys
from typing import TypeVar, Type
# local imports
from catcli.colors import Colors
from catcli.utils import fix_badchars
CLASSTYPE = TypeVar('CLASSTYPE', bound='Logger')
class Logger:
"""log to stdout/stderr"""
@classmethod
def stdout_nocolor(cls, string):
def stdout_nocolor(cls: Type[CLASSTYPE],
string: str) -> None:
"""to stdout no color"""
string = fix_badchars(string)
sys.stdout.write(f'{string}\n')
@classmethod
def stderr_nocolor(cls, string):
def stderr_nocolor(cls: Type[CLASSTYPE],
string: str) -> None:
"""to stderr no color"""
string = fix_badchars(string)
sys.stderr.write(f'{string}\n')
@classmethod
def debug(cls, string):
def debug(cls: Type[CLASSTYPE],
string: str) -> None:
"""to stderr no color"""
cls.stderr_nocolor(f'[DBG] {string}\n')
@classmethod
def info(cls, string):
def info(cls: Type[CLASSTYPE],
string: str) -> None:
"""to stdout in color"""
string = fix_badchars(string)
out = f'{Colors.MAGENTA}{string}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def err(cls, string):
def err(cls: Type[CLASSTYPE],
string: str) -> None:
"""to stderr in RED"""
string = fix_badchars(string)
out = f'{Colors.RED}{string}{Colors.RESET}'
sys.stderr.write(f'{out}\n')
@classmethod
def progr(cls, string):
def progr(cls: Type[CLASSTYPE],
string: str) -> None:
"""print progress"""
string = fix_badchars(string)
sys.stderr.write(f'{string}\r')
sys.stderr.flush()
@classmethod
def get_bold_text(cls, string):
def get_bold_text(cls: Type[CLASSTYPE],
string: str) -> str:
"""make it bold"""
string = fix_badchars(string)
return f'{Colors.BOLD}{string}{Colors.RESET}'
@classmethod
def log_to_file(cls, path, string, append=True):
def log_to_file(cls: Type[CLASSTYPE],
path: str,
string: str,
append: bool = True) -> None:
"""log to file"""
string = fix_badchars(string)
mode = 'w'

@ -6,11 +6,15 @@ Class for printing nodes
"""
import sys
from typing import TypeVar, Type, Optional, Tuple, List
from catcli.colors import Colors
from catcli.utils import fix_badchars
CLASSTYPE = TypeVar('CLASSTYPE', bound='NodePrinter')
class NodePrinter:
"""a node printer class"""
@ -19,7 +23,9 @@ class NodePrinter:
NBFILES = 'nbfiles'
@classmethod
def print_storage_native(cls, pre, name, args, attr):
def print_storage_native(cls: Type[CLASSTYPE], pre: str,
name: str, args: str,
attr: str) -> None:
"""print a storage node"""
end = ''
if attr:
@ -31,7 +37,8 @@ class NodePrinter:
sys.stdout.write(f'{out}\n')
@classmethod
def print_file_native(cls, pre, name, attr):
def print_file_native(cls: Type[CLASSTYPE], pre: str,
name: str, attr: str) -> None:
"""print a file node"""
nobad = fix_badchars(name)
out = f'{pre}{nobad}'
@ -39,22 +46,26 @@ class NodePrinter:
sys.stdout.write(f'{out}\n')
@classmethod
def print_dir_native(cls, pre, name, depth='', attr=None):
def print_dir_native(cls: Type[CLASSTYPE], pre: str,
name: str,
depth: int = 0,
attr: Optional[List[Tuple[str, str]]] = None) -> None:
"""print a directory node"""
end = []
if depth != '':
if depth > 0:
end.append(f'{cls.NBFILES}:{depth}')
if attr:
end.append(' '.join([f'{x}:{y}' for x, y in attr]))
end_string = ''
if end:
endstring = ', '.join(end)
end = f' [{endstring}]'
end_string = f' [{", ".join(end)}]'
out = pre + Colors.BLUE + fix_badchars(name) + Colors.RESET
out += f'{Colors.GRAY}{end}{Colors.RESET}'
out += f'{Colors.GRAY}{end_string}{Colors.RESET}'
sys.stdout.write(f'{out}\n')
@classmethod
def print_archive_native(cls, pre, name, archive):
def print_archive_native(cls: Type[CLASSTYPE], pre: str,
name: str, archive: str) -> None:
"""archive to stdout"""
out = pre + Colors.YELLOW + fix_badchars(name) + Colors.RESET
out += f' {Colors.GRAY}[{cls.ARCHIVE}:{archive}]{Colors.RESET}'

@ -2,14 +2,15 @@
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2017, deadc0de6
Class that represents a node in the catalog tree
Class that process nodes in the catalog tree
"""
import os
import shutil
import time
import anytree
from pyfzf.pyfzf import FzfPrompt
from typing import List, Union, Tuple, Any, Optional, Dict
import anytree # type: ignore
from pyfzf.pyfzf import FzfPrompt # type: ignore
# local imports
from catcli.utils import size_to_str, epoch_to_str, md5sum, fix_badchars
@ -44,7 +45,9 @@ class Noder:
'maccess,md5,nbfiles,free_space,'
'total_space,meta')
def __init__(self, debug=False, sortsize=False, arc=False):
def __init__(self, debug: bool = False,
sortsize: bool = False,
arc: bool = False) -> None:
"""
@debug: debug mode
@sortsize: sort nodes by size
@ -58,11 +61,12 @@ class Noder:
self.decomp = Decomp()
@staticmethod
def get_storage_names(top):
def get_storage_names(top: anytree.AnyNode) -> List[str]:
"""return a list of all storage names"""
return [x.name for x in list(top.children)]
def get_storage_node(self, top, name, path=None):
def get_storage_node(self, top: anytree.AnyNode,
name: str, path: str = '') -> anytree.AnyNode:
"""
return the storage node if any
if path is submitted, it will update the media info
@ -81,7 +85,8 @@ class Noder:
return found
@staticmethod
def get_node(top, path, quiet=False):
def get_node(top: str, path: str,
quiet: bool = False) -> anytree.AnyNode:
"""get the node by internal tree path"""
resolv = anytree.resolver.Resolver('name')
try:
@ -92,7 +97,10 @@ class Noder:
Logger.err(f'No node at path \"{bpath}\"')
return None
def get_node_if_changed(self, top, path, treepath):
def get_node_if_changed(self,
top: anytree.AnyNode,
path: str,
treepath: str) -> Tuple[anytree.AnyNode, bool]:
"""
return the node (if any) and if it has changed
@top: top node (storage)
@ -128,17 +136,18 @@ class Noder:
self._debug(f'\tchange: no change for \"{path}\"')
return node, False
def rec_size(self, node, store=True):
def rec_size(self, node: anytree.AnyNode,
store: bool = True) -> float:
"""
recursively traverse tree and return size
@store: store the size in the node
"""
if node.type == self.TYPE_FILE:
self._debug(f'getting node size for \"{node.name}\"')
return node.size
return float(node.size)
msg = f'getting node size recursively for \"{node.name}\"'
self._debug(msg)
size = 0
size: float = 0
for i in node.children:
if node.type == self.TYPE_DIR:
size = self.rec_size(i, store=store)
@ -160,7 +169,7 @@ class Noder:
# public helpers
###############################################################
@staticmethod
def format_storage_attr(attr):
def format_storage_attr(attr: Union[str, List[str]]) -> str:
"""format the storage attr for saving"""
if not attr:
return ''
@ -169,18 +178,19 @@ class Noder:
attr = attr.rstrip()
return attr
def set_hashing(self, val):
def set_hashing(self, val: bool) -> None:
"""hash files when indexing"""
self.hash = val
###############################################################
# node creation
###############################################################
def new_top_node(self):
def new_top_node(self) -> anytree.AnyNode:
"""create a new top node"""
return anytree.AnyNode(name=self.NAME_TOP, type=self.TYPE_TOP)
def new_file_node(self, name, path, parent, storagepath):
def new_file_node(self, name: str, path: str,
parent: str, storagepath: str) -> anytree.AnyNode:
"""create a new node representing a file"""
if not os.path.exists(path):
Logger.err(f'File \"{path}\" does not exist')
@ -191,14 +201,16 @@ class Noder:
except OSError as exc:
Logger.err(f'OSError: {exc}')
return None
md5 = None
md5 = ''
if self.hash:
md5 = self._get_hash(path)
relpath = os.sep.join([storagepath, name])
maccess = os.path.getmtime(path)
node = self._new_generic_node(name, self.TYPE_FILE, relpath, parent,
size=stat.st_size, md5=md5,
node = self._new_generic_node(name, self.TYPE_FILE,
relpath, parent,
size=stat.st_size,
md5=md5,
maccess=maccess)
if self.arc:
ext = os.path.splitext(path)[1][1:]
@ -210,7 +222,8 @@ class Noder:
self._debug(f'{path} is NOT an archive')
return node
def new_dir_node(self, name, path, parent, storagepath):
def new_dir_node(self, name: str, path: str,
parent: str, storagepath: str) -> anytree.AnyNode:
"""create a new node representing a directory"""
path = os.path.abspath(path)
relpath = os.sep.join([storagepath, name])
@ -218,7 +231,10 @@ class Noder:
return self._new_generic_node(name, self.TYPE_DIR, relpath,
parent, maccess=maccess)
def new_storage_node(self, name, path, parent, attr=None):
def new_storage_node(self, name: str,
path: str,
parent: str,
attr: Optional[str] = None) -> anytree.AnyNode:
"""create a new node representing a storage"""
path = os.path.abspath(path)
free = shutil.disk_usage(path).free
@ -227,15 +243,19 @@ class Noder:
return anytree.AnyNode(name=name, type=self.TYPE_STORAGE, free=free,
total=total, parent=parent, attr=attr, ts=epoch)
def new_archive_node(self, name, path, parent, archive):
def new_archive_node(self, name: str, path: str,
parent: str, archive: str) -> anytree.AnyNode:
"""create a new node for archive data"""
return anytree.AnyNode(name=name, type=self.TYPE_ARC, relpath=path,
parent=parent, size=0, md5=None,
archive=archive)
@staticmethod
def _new_generic_node(name, nodetype, relpath, parent,
size=None, md5=None, maccess=None):
def _new_generic_node(name: str, nodetype: str,
relpath: str, parent: str,
size: float = 0,
md5: str = '',
maccess: float = 0) -> anytree.AnyNode:
"""generic node creation"""
return anytree.AnyNode(name=name, type=nodetype, relpath=relpath,
parent=parent, size=size,
@ -244,21 +264,22 @@ class Noder:
###############################################################
# node management
###############################################################
def update_metanode(self, top):
def update_metanode(self, top: anytree.AnyNode) -> anytree.AnyNode:
"""create or update meta node information"""
meta = self._get_meta_node(top)
epoch = int(time.time())
if not meta:
attr = {}
attr: Dict[str, Any] = {}
attr['created'] = epoch
attr['created_version'] = VERSION
meta = anytree.AnyNode(name=self.NAME_META, type=self.TYPE_META,
meta = anytree.AnyNode(name=self.NAME_META,
type=self.TYPE_META,
attr=attr)
meta.attr['access'] = epoch
meta.attr['access_version'] = VERSION
return meta
def _get_meta_node(self, top):
def _get_meta_node(self, top: anytree.AnyNode) -> anytree.AnyNode:
"""return the meta node if any"""
try:
return next(filter(lambda x: x.type == self.TYPE_META,
@ -266,7 +287,7 @@ class Noder:
except StopIteration:
return None
def clean_not_flagged(self, top):
def clean_not_flagged(self, top: anytree.AnyNode) -> int:
"""remove any node not flagged and clean flags"""
cnt = 0
for node in anytree.PreOrderIter(top):
@ -277,11 +298,11 @@ class Noder:
return cnt
@staticmethod
def flag(node):
def flag(node: anytree.AnyNode) -> None:
"""flag a node"""
node.flag = True
def _clean(self, node):
def _clean(self, node: anytree.AnyNode) -> bool:
"""remove node if not flagged"""
if not self._has_attr(node, 'flag') or \
not node.flag:
@ -293,7 +314,9 @@ class Noder:
###############################################################
# printing
###############################################################
def _node_to_csv(self, node, sep=',', raw=False):
def _node_to_csv(self, node: anytree.AnyNode,
sep: str = ',',
raw: bool = False) -> None:
"""
print a node to csv
@node: the node to consider
@ -353,9 +376,13 @@ class Noder:
if len(line) > 0:
Logger.stdout_nocolor(line)
def _print_node_native(self, node, pre='', withpath=False,
withdepth=False, withstorage=False,
recalcparent=False, raw=False):
def _print_node_native(self, node: anytree.AnyNode,
pre: str = '',
withpath: bool = False,
withdepth: bool = False,
withstorage: bool = False,
recalcparent: bool = False,
raw: bool = False) -> None:
"""
print a node
@node: the node to print
@ -380,11 +407,11 @@ class Noder:
name = name.lstrip(os.sep)
if withstorage:
storage = self._get_storage(node)
attr = ''
attr_str = ''
if node.md5:
attr = f', md5:{node.md5}'
attr_str = f', md5:{node.md5}'
size = size_to_str(node.size, raw=raw)
compl = f'size:{size}{attr}'
compl = f'size:{size}{attr_str}'
if withstorage:
content = Logger.get_bold_text(storage.name)
compl += f', storage:{content}'
@ -398,16 +425,16 @@ class Noder:
else:
name = node.relpath
name = name.lstrip(os.sep)
depth = ''
depth = 0
if withdepth:
depth = len(node.children)
if withstorage:
storage = self._get_storage(node)
attr = []
attr: List[Tuple[str, str]] = []
if node.size:
attr.append(['totsize', size_to_str(node.size, raw=raw)])
attr.append(('totsize', size_to_str(node.size, raw=raw)))
if withstorage:
attr.append(['storage', Logger.get_bold_text(storage.name)])
attr.append(('storage', Logger.get_bold_text(storage.name)))
NodePrinter.print_dir_native(pre, name, depth=depth, attr=attr)
elif node.type == self.TYPE_STORAGE:
# node of type storage
@ -423,9 +450,9 @@ class Noder:
timestamp += epoch_to_str(node.ts)
disksize = ''
# the children size
size = self.rec_size(node, store=False)
size = size_to_str(size, raw=raw)
disksize = 'totsize:' + f'{size}'
recsize = self.rec_size(node, store=False)
sizestr = size_to_str(recsize, raw=raw)
disksize = 'totsize:' + f'{sizestr}'
# format the output
name = node.name
args = [
@ -446,9 +473,9 @@ class Noder:
else:
Logger.err(f'bad node encountered: {node}')
def print_tree(self, node,
fmt='native',
raw=False):
def print_tree(self, node: anytree.AnyNode,
fmt: str = 'native',
raw: bool = False) -> None:
"""
print the tree in different format
@node: start node
@ -470,20 +497,21 @@ class Noder:
Logger.stdout_nocolor(self.CSV_HEADER)
self._to_csv(node, raw=raw)
def _to_csv(self, node, raw=False):
def _to_csv(self, node: anytree.AnyNode,
raw: bool = False) -> None:
"""print the tree to csv"""
rend = anytree.RenderTree(node, childiter=self._sort_tree)
for _, _, item in rend:
self._node_to_csv(item, raw=raw)
@staticmethod
def _fzf_prompt(strings):
def _fzf_prompt(strings: Any) -> Any:
# prompt with fzf
fzf = FzfPrompt()
selected = fzf.prompt(strings)
return selected
def _to_fzf(self, node, fmt):
def _to_fzf(self, node: anytree.AnyNode, fmt: str) -> None:
"""
fzf prompt with list and print selected node(s)
@node: node to start with
@ -512,7 +540,8 @@ class Noder:
self.print_tree(rend, fmt=subfmt)
@staticmethod
def to_dot(node, path='tree.dot'):
def to_dot(node: anytree.AnyNode,
path: str = 'tree.dot') -> str:
"""export to dot for graphing"""
anytree.exporter.DotExporter(node).to_dotfile(path)
Logger.info(f'dot file created under \"{path}\"')
@ -521,10 +550,14 @@ class Noder:
###############################################################
# searching
###############################################################
def find_name(self, top, key,
script=False, only_dir=False,
startpath=None, parentfromtree=False,
fmt='native', raw=False):
def find_name(self, top: anytree.AnyNode,
key: str,
script: bool = False,
only_dir: bool = False,
startnode: anytree.AnyNode = None,
parentfromtree: bool = False,
fmt: str = 'native',
raw: bool = False) -> List[anytree.AnyNode]:
"""
find files based on their names
@top: top node
@ -541,8 +574,8 @@ class Noder:
# search for nodes based on path
start = top
if startpath:
start = self.get_node(top, startpath)
if startnode:
start = self.get_node(top, startnode)
filterfunc = self._callback_find_name(key, only_dir)
found = anytree.findall(start, filter_=filterfunc)
nbfound = len(found)
@ -591,9 +624,9 @@ class Noder:
return list(paths.values())
def _callback_find_name(self, term, only_dir):
def _callback_find_name(self, term: str, only_dir: bool) -> Any:
"""callback for finding files"""
def find_name(node):
def find_name(node: anytree.AnyNode) -> bool:
if node.type == self.TYPE_STORAGE:
# ignore storage nodes
return False
@ -620,10 +653,11 @@ class Noder:
###############################################################
# ls
###############################################################
def list(self, top, path,
rec=False,
fmt='native',
raw=False):
def list(self, top: anytree.AnyNode,
path: str,
rec: bool = False,
fmt: str = 'native',
raw: bool = False) -> List[anytree.AnyNode]:
"""
list nodes for "ls"
@top: top node
@ -684,7 +718,9 @@ class Noder:
###############################################################
# tree creation
###############################################################
def _add_entry(self, name, top, resolv):
def _add_entry(self, name: str,
top: anytree.AnyNode,
resolv: Any) -> None:
"""add an entry to the tree"""
entries = name.rstrip(os.sep).split(os.sep)
if len(entries) == 1:
@ -698,7 +734,7 @@ class Noder:
except anytree.resolver.ChildResolverError:
self.new_archive_node(nodename, name, top, top.name)
def list_to_tree(self, parent, names):
def list_to_tree(self, parent: anytree.AnyNode, names: List[str]) -> None:
"""convert list of files to a tree"""
if not names:
return
@ -710,43 +746,44 @@ class Noder:
###############################################################
# diverse
###############################################################
def _sort_tree(self, items):
def _sort_tree(self,
items: List[anytree.AnyNode]) -> List[anytree.AnyNode]:
"""sorting a list of items"""
return sorted(items, key=self._sort, reverse=self.sortsize)
def _sort(self, lst):
def _sort(self, lst: List[anytree.AnyNode]) -> Any:
"""sort a list"""
if self.sortsize:
return self._sort_size(lst)
return self._sort_fs(lst)
@staticmethod
def _sort_fs(node):
def _sort_fs(node: anytree.AnyNode) -> Tuple[str, str]:
"""sorting nodes dir first and alpha"""
return (node.type, node.name.lstrip('.').lower())
@staticmethod
def _sort_size(node):
def _sort_size(node: anytree.AnyNode) -> float:
"""sorting nodes by size"""
try:
if not node.size:
return 0
return node.size
return float(node.size)
except AttributeError:
return 0
def _get_storage(self, node):
def _get_storage(self, node: anytree.AnyNode) -> anytree.AnyNode:
"""recursively traverse up to find storage"""
if node.type == self.TYPE_STORAGE:
return node
return node.ancestors[1]
@staticmethod
def _has_attr(node, attr):
def _has_attr(node: anytree.AnyNode, attr: str) -> bool:
"""return True if node has attr as attribute"""
return attr in node.__dict__.keys()
def _get_parents(self, node):
def _get_parents(self, node: anytree.AnyNode) -> str:
"""get all parents recursively"""
if node.type == self.TYPE_STORAGE:
return ''
@ -755,25 +792,25 @@ class Noder:
parent = self._get_parents(node.parent)
if parent:
return os.sep.join([parent, node.name])
return node.name
return str(node.name)
@staticmethod
def _get_hash(path):
def _get_hash(path: str) -> str:
"""return md5 hash of node"""
try:
return md5sum(path)
except CatcliException as exc:
Logger.err(str(exc))
return None
return ''
@staticmethod
def _sanitize(node):
def _sanitize(node: anytree.AnyNode) -> anytree.AnyNode:
"""sanitize node strings"""
node.name = fix_badchars(node.name)
node.relpath = fix_badchars(node.relpath)
return node
def _debug(self, string):
def _debug(self, string: str) -> None:
"""print debug"""
if not self.debug:
return

@ -18,7 +18,7 @@ from catcli.exceptions import CatcliException
SEPARATOR = '/'
def md5sum(path):
def md5sum(path: str) -> str:
"""
calculate md5 sum of a file
may raise exception
@ -39,10 +39,11 @@ def md5sum(path):
pass
except OSError as exc:
raise CatcliException(f'md5sum error: {exc}') from exc
return None
return ''
def size_to_str(size, raw=True):
def size_to_str(size: float,
raw: bool = True) -> str:
"""convert size to string, optionally human readable"""
div = 1024.
suf = ['B', 'K', 'M', 'G', 'T', 'P']
@ -56,7 +57,7 @@ def size_to_str(size, raw=True):
return f'{size:.1f}{sufix}'
def epoch_to_str(epoch):
def epoch_to_str(epoch: int) -> str:
"""convert epoch to string"""
if not epoch:
return ''
@ -65,18 +66,18 @@ def epoch_to_str(epoch):
return timestamp.strftime(fmt)
def ask(question):
def ask(question: str) -> bool:
"""ask the user what to do"""
resp = input(f'{question} [y|N] ? ')
return resp.lower() == 'y'
def edit(string):
def edit(string: str) -> str:
"""edit the information with the default EDITOR"""
string = string.encode('utf-8')
data = string.encode('utf-8')
editor = os.environ.get('EDITOR', 'vim')
with tempfile.NamedTemporaryFile(prefix='catcli', suffix='.tmp') as file:
file.write(string)
file.write(data)
file.flush()
subprocess.call([editor, file.name])
file.seek(0)
@ -84,6 +85,6 @@ def edit(string):
return new.decode('utf-8')
def fix_badchars(string):
def fix_badchars(string: str) -> str:
"""fix none utf-8 chars in string"""
return string.encode('utf-8', 'ignore').decode('utf-8')

@ -6,8 +6,11 @@ Catcli filesystem indexer
"""
import os
from typing import Tuple
import anytree # type: ignore
# local imports
from catcli.noder import Noder
from catcli.logger import Logger
@ -16,8 +19,10 @@ class Walker:
MAXLINELEN = 80 - 15
def __init__(self, noder, usehash=True, debug=False,
logpath=None):
def __init__(self, noder: Noder,
usehash: bool = True,
debug: bool = False,
logpath: str = ''):
"""
@noder: the noder to use
@hash: calculate hash of nodes
@ -30,7 +35,10 @@ class Walker:
self.debug = debug
self.lpath = logpath
def index(self, path, parent, name, storagepath=''):
def index(self, path: str,
parent: str,
name: str,
storagepath: str = '') -> Tuple[str, int]:
"""
index a directory and store in tree
@path: path to index
@ -39,7 +47,8 @@ class Walker:
"""
self._debug(f'indexing starting at {path}')
if not parent:
parent = self.noder.new_dir_node(name, path, parent)
parent = self.noder.new_dir_node(name, path,
parent, storagepath)
if os.path.islink(path):
rel = os.readlink(path)
@ -77,16 +86,19 @@ class Walker:
_, cnt2 = self.index(sub, dummy, base, nstoragepath)
cnt += cnt2
break
self._progress(None)
self._progress('')
return parent, cnt
def reindex(self, path, parent, top):
def reindex(self, path: str, parent: str, top: str) -> int:
"""reindex a directory and store in tree"""
cnt = self._reindex(path, parent, top)
cnt += self.noder.clean_not_flagged(parent)
return cnt
def _reindex(self, path, parent, top, storagepath=''):
def _reindex(self, path: str,
parent: str,
top: anytree.AnyNode,
storagepath: str = '') -> int:
"""
reindex a directory and store in tree
@path: directory path to re-index
@ -131,7 +143,10 @@ class Walker:
break
return cnt
def _need_reindex(self, top, path, treepath):
def _need_reindex(self,
top: anytree.AnyNode,
path: str,
treepath: str) -> Tuple[bool, anytree.AnyNode]:
"""
test if node needs re-indexing
@top: top node (storage)
@ -153,13 +168,13 @@ class Walker:
cnode.parent = None
return True, cnode
def _debug(self, string):
def _debug(self, string: str) -> None:
"""print to debug"""
if not self.debug:
return
Logger.debug(string)
def _progress(self, string):
def _progress(self, string: str) -> None:
"""print progress"""
if self.debug:
return
@ -171,7 +186,7 @@ class Walker:
string = string[:self.MAXLINELEN] + '...'
Logger.progr(f'indexing: {string:80}')
def _log2file(self, string):
def _log2file(self, string: str) -> None:
"""log to file"""
if not self.lpath:
return

Loading…
Cancel
Save