You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
catcli/catcli/noder.py

743 lines
26 KiB
Python

"""
author: deadc0de6 (https://github.com/deadc0de6)
Copyright (c) 2017, deadc0de6
Class that process nodes in the catalog tree
"""
import os
import shutil
import time
from typing import List, Union, Tuple, Any, Optional, Dict, cast
import fnmatch
import anytree
from natsort import os_sort_keygen
# local imports
from catcli import nodes
from catcli.nodes import NodeAny, NodeStorage, \
NodeTop, NodeFile, NodeArchived, NodeDir, NodeMeta, \
typcast_node
from catcli.utils import md5sum
from catcli.logger import Logger
from catcli.printer_native import NativePrinter
from catcli.printer_csv import CsvPrinter
from catcli.decomp import Decomp
from catcli.version import __version__ as VERSION
from catcli.exceptions import CatcliException
class Noder:
"""
handles node in the catalog tree
There are 4 types of node:
* "top" node representing the top node (generic node)
* "storage" node representing a storage
* "dir" node representing a directory
* "file" node representing a file
"""
# pylint: disable=R0904
def __init__(self, debug: bool = False,
sortsize: bool = False,
arc: bool = False) -> None:
"""
@debug: debug mode
@sortsize: sort nodes by size
@arch: handle archive
"""
self.hash = True
self.debug = debug
self.sortsize = sortsize
self.arc = arc
if self.arc:
self.decomp = Decomp()
self.csv_printer = CsvPrinter()
self.native_printer = NativePrinter()
@staticmethod
def get_storage_names(top: NodeTop) -> List[str]:
"""return a list of all storage names"""
return [x.name for x in list(top.children)]
def find_storage_node_by_name(self, top: NodeTop,
name: str) -> Optional[NodeStorage]:
"""find a storage node by name"""
for node in top.children:
if node.type != nodes.TYPE_STORAGE:
continue
if node.name == name:
return cast(NodeStorage, node)
return None
def update_storage_path(self, top: NodeTop,
name: str,
newpath: str) -> None:
"""find and update storage path on update"""
storage = self.find_storage_node_by_name(top, name)
if storage and newpath and os.path.exists(newpath):
storage.free = shutil.disk_usage(newpath).free
storage.total = shutil.disk_usage(newpath).total
storage.ts = int(time.time())
@staticmethod
def get_node(top: NodeTop,
path: str,
quiet: bool = False) -> Optional[NodeAny]:
"""get the node by internal tree path"""
resolv = anytree.resolver.Resolver('name')
bpath = ''
try:
bpath = os.path.basename(path)
the_node = resolv.get(top, bpath)
typcast_node(the_node)
return cast(NodeAny, the_node)
except anytree.resolver.ChildResolverError:
if not quiet:
Logger.err(f'No node at path \"{bpath}\"')
return None
def get_node_if_changed(self,
top: NodeTop,
path: str,
treepath: str) -> Tuple[Optional[NodeAny], bool]:
"""
return the node (if any) and if it has changed
@top: top node (storage)
@path: abs path to file
@treepath: rel path from indexed directory
"""
treepath = treepath.lstrip(os.sep)
node = self.get_node(top, treepath, quiet=True)
# node does not exist
if not node:
self._debug('\tchange: node does not exist')
return None, True
if os.path.isdir(path):
return node, False
# force re-indexing if no maccess
maccess = os.path.getmtime(path)
if not node.has_attr('maccess') or \
not node.maccess:
self._debug('\tchange: no maccess found')
return node, True
# maccess changed
old_maccess = node.maccess
if float(maccess) != float(old_maccess):
self._debug(f'\tchange: maccess changed for \"{path}\"')
return node, True
# test hash
if self.hash and node.md5:
md5 = self._get_hash(path)
if md5 and md5 != node.md5:
msg = f'\tchange: checksum changed for \"{path}\"'
self._debug(msg)
return node, True
self._debug(f'\tchange: no change for \"{path}\"')
return node, False
###############################################################
# public helpers
###############################################################
@staticmethod
def attrs_to_string(attr: Union[List[str], Dict[str, str], str]) -> str:
"""format the storage attr for saving"""
if not attr:
return ''
if isinstance(attr, list):
return ', '.join(attr)
if isinstance(attr, dict):
ret = []
for key, val in attr.items():
ret.append(f'{key}={val}')
return ', '.join(ret)
attr = attr.rstrip()
return attr
def do_hashing(self, val: bool) -> None:
"""hash files when indexing"""
self.hash = val
###############################################################
# node creation
###############################################################
def new_top_node(self) -> NodeTop:
"""create a new top node"""
top = NodeTop(nodes.NAME_TOP)
self._debug(f'new top node: {top}')
return top
def new_file_node(self, name: str, path: str,
parent: NodeAny) -> Optional[NodeFile]:
"""create a new node representing a file"""
if not os.path.exists(path):
Logger.err(f'File \"{path}\" does not exist')
return None
path = os.path.abspath(path)
try:
stat = os.lstat(path)
except OSError as exc:
Logger.err(f'OSError: {exc}')
return None
md5 = ''
if self.hash:
md5 = self._get_hash(path)
maccess = os.path.getmtime(path)
node = NodeFile(name,
stat.st_size,
md5,
maccess,
parent=parent)
if self.arc:
ext = os.path.splitext(path)[1][1:]
if ext.lower() in self.decomp.get_formats():
self._debug(f'{path} is an archive')
names = self.decomp.get_names(path)
self.list_to_tree(node, names)
else:
self._debug(f'{path} is NOT an archive')
return node
def new_dir_node(self, name: str, path: str,
parent: NodeAny) -> NodeDir:
"""create a new node representing a directory"""
path = os.path.abspath(path)
maccess = os.path.getmtime(path)
return NodeDir(name,
0,
maccess,
parent=parent)
def new_storage_node(self, name: str,
path: str,
parent: str,
attrs: Dict[str, Any]) \
-> NodeStorage:
"""create a new node representing a storage"""
path = os.path.abspath(path)
free = shutil.disk_usage(path).free
total = shutil.disk_usage(path).total
epoch = int(time.time())
return NodeStorage(name,
free,
total,
0,
epoch,
self.attrs_to_string(attrs),
parent=parent)
def new_archive_node(self,
name: str,
parent: str,
archive: str) -> NodeArchived:
"""create a new node for archive data"""
return NodeArchived(name=name,
parent=parent, nodesize=0, md5='',
archive=archive)
###############################################################
# node management
###############################################################
def update_metanode(self, top: NodeTop) -> NodeMeta:
"""create or update meta node information"""
meta = self._get_meta_node(top)
epoch = int(time.time())
if not meta:
attrs: Dict[str, Any] = {}
attrs['created'] = epoch
attrs['created_version'] = VERSION
meta = NodeMeta(name=nodes.NAME_META,
attr=attrs)
meta.attr['access'] = epoch
meta.attr['access_version'] = VERSION
return meta
def _get_meta_node(self, top: NodeTop) -> Optional[NodeMeta]:
"""return the meta node if any"""
try:
found = next(filter(lambda x: x.type == nodes.TYPE_META,
top.children))
return cast(NodeMeta, found)
except StopIteration:
return None
def clean_not_flagged(self, top: NodeTop) -> int:
"""remove any node not flagged and clean flags"""
cnt = 0
for node in anytree.PreOrderIter(top):
typcast_node(node)
if node.type not in [nodes.TYPE_DIR, nodes.TYPE_FILE]:
continue
if self._clean(node):
cnt += 1
return cnt
def _clean(self, node: NodeAny) -> bool:
"""remove node if not flagged"""
if not node.flagged():
node.parent = None
return True
node.unflag()
return False
###############################################################
# printing
###############################################################
def _print_node_csv(self, node: NodeAny,
sep: str = ',',
raw: bool = False) -> None:
"""
print a node to csv
@node: the node to consider
@sep: CSV separator character
@raw: print raw size rather than human readable
"""
typcast_node(node)
if not node:
return
if node.type == nodes.TYPE_TOP:
return
if node.type == nodes.TYPE_STORAGE:
self.csv_printer.print_storage(node,
sep=sep,
raw=raw)
else:
self.csv_printer.print_node(node,
sep=sep,
raw=raw)
def _print_node_du(self, node: NodeAny,
raw: bool = False) -> None:
"""
print node du style
"""
typcast_node(node)
thenodes = self._get_entire_tree(node,
dironly=True)
for thenode in thenodes:
self.native_printer.print_du(thenode, raw=raw)
def _print_node_native(self, node: NodeAny,
pre: str = '',
withpath: bool = False,
withnbchildren: bool = False,
withstorage: bool = False,
raw: bool = False) -> None:
"""
print a node
@node: the node to print
@pre: string to print before node
@withpath: print the node path
@withnbchildren: print the node nb children
@withstorage: print the node storage it belongs to
@raw: print raw size rather than human readable
"""
typcast_node(node)
if node.type == nodes.TYPE_TOP:
# top node
self.native_printer.print_top(pre, node.get_name())
elif node.type == nodes.TYPE_FILE:
# node of type file
self.native_printer.print_file(pre, node,
withpath=withpath,
withstorage=withstorage,
raw=raw)
elif node.type == nodes.TYPE_DIR:
# node of type directory
self.native_printer.print_dir(pre,
node,
withpath=withpath,
withstorage=withstorage,
withnbchildren=withnbchildren,
raw=raw)
elif node.type == nodes.TYPE_STORAGE:
# node of type storage
self.native_printer.print_storage(pre,
node,
raw=raw)
elif node.type == nodes.TYPE_ARCHIVED:
# archive node
if self.arc:
self.native_printer.print_archive(pre, node.name, node.archive)
else:
Logger.err(f'bad node encountered: {node}')
def print_tree(self, node: NodeAny,
fmt: str = 'native',
raw: bool = False) -> None:
"""
print the tree in different format
@node: start node
@style: when fmt=native, defines the tree style
@fmt: output format
@raw: print the raw size rather than human readable
"""
if fmt == 'native':
# "tree" style
rend = anytree.RenderTree(node, childiter=self._sort_tree)
for pre, _, thenode in rend:
self._print_node_native(thenode, pre=pre,
withnbchildren=True, raw=raw)
elif fmt == 'csv':
# csv output
self._print_nodes_csv(node, raw=raw)
elif fmt == 'csv-with-header':
# csv output
self.csv_printer.print_header()
self._print_nodes_csv(node, raw=raw)
def _print_nodes_csv(self, node: NodeAny,
raw: bool = False) -> None:
"""print the tree to csv"""
rend = anytree.RenderTree(node, childiter=self._sort_tree)
for _, _, item in rend:
self._print_node_csv(item, raw=raw)
@staticmethod
def _fzf_prompt(strings: Any) -> Any:
"""prompt with fzf"""
try:
from pyfzf.pyfzf import FzfPrompt # pylint: disable=C0415 # noqa
fzf = FzfPrompt()
selected = fzf.prompt(strings)
return selected
except ModuleNotFoundError:
Logger.err('install pyfzf to use fzf')
return None
def _to_fzf(self, node: NodeAny, fmt: str) -> None:
"""
fzf prompt with list and print selected node(s)
@node: node to start with
@fmt: output format for selected nodes
"""
rendered = anytree.RenderTree(node, childiter=self._sort_tree)
the_nodes = {}
# construct node names list
for _, _, rend in rendered:
if not rend:
continue
parents = rend.get_fullpath()
storage = rend.get_storage_node()
fullpath = os.path.join(storage.get_name(), parents)
the_nodes[fullpath] = rend
# prompt with fzf
paths = self._fzf_prompt(the_nodes.keys())
# print the resulting tree
subfmt = fmt.replace('fzf-', '')
for path in paths:
if not path:
continue
if path not in the_nodes:
continue
rend = the_nodes[path]
self.print_tree(rend, fmt=subfmt)
@staticmethod
def to_dot(top: NodeTop,
path: str = 'tree.dot') -> str:
"""export to dot for graphing"""
anytree.exporter.DotExporter(top).to_dotfile(path)
Logger.info(f'dot file created under \"{path}\"')
return f'dot {path} -T png -o /tmp/tree.png'
###############################################################
# searching
###############################################################
def find(self, top: NodeTop,
key: str,
script: bool = False,
only_dir: bool = False,
startnode: Optional[NodeAny] = None,
fmt: str = 'native',
raw: bool = False) -> List[NodeAny]:
"""
find files based on their names
@top: top node
@key: term to search for
@script: output script
@directory: only search for directories
@startpath: node to start with
@fmt: output format
@raw: raw size output
returns the found nodes
"""
self._debug(f'searching for \"{key}\"')
# search for nodes based on path
start: Optional[NodeAny] = top
if startnode:
start = self.get_node(top, startnode)
filterfunc = self._callback_find_name(key, only_dir)
found = anytree.findall(start, filter_=filterfunc)
self._debug(f'found {len(found)} node(s)')
# compile found nodes
paths = {}
for item in found:
typcast_node(item)
item.set_name(item.get_name())
key = item.get_fullpath()
paths[key] = item
# handle fzf mode
if fmt.startswith('fzf'):
selected = self._fzf_prompt(paths.keys())
newpaths = {}
subfmt = fmt.replace('fzf-', '')
for item in selected:
if item not in paths:
continue
newpaths[item] = paths[item]
self.print_tree(newpaths[item], fmt=subfmt)
paths = newpaths
else:
if fmt == 'native':
for _, item in paths.items():
self._print_node_native(item,
withpath=True,
withnbchildren=True,
withstorage=True,
raw=raw)
elif fmt.startswith('csv'):
if fmt == 'csv-with-header':
self.csv_printer.print_header()
for _, item in paths.items():
self._print_node_csv(item, raw=raw)
# execute script if any
if script:
tmp = ['${source}/' + x for x in paths]
tmpstr = ' '.join(tmp)
cmd = f'op=file; source=/media/mnt; $op {tmpstr}'
Logger.info(cmd)
return list(paths.values())
def _callback_find_name(self, term: str, only_dir: bool) -> Any:
"""callback for finding files"""
def find_name(node: NodeAny) -> bool:
typcast_node(node)
path = node.get_fullpath()
if node.type == nodes.TYPE_STORAGE:
# ignore storage nodes
return False
if node.type == nodes.TYPE_TOP:
# ignore top nodes
return False
if node.type == nodes.TYPE_META:
# ignore meta nodes
return False
if only_dir and node.type == nodes.TYPE_DIR:
# ignore non directory
return False
# filter
if not term:
return True
if term in path:
return True
if self.debug:
Logger.debug(f'match \"{path}\" with \"{term}\"')
if fnmatch.fnmatch(path, term):
return True
# ignore
return False
return find_name
###############################################################
# fixsizes
###############################################################
def fixsizes(self, top: NodeTop) -> None:
"""fix node sizes"""
typcast_node(top)
rend = anytree.RenderTree(top)
for _, _, thenode in rend:
typcast_node(thenode)
thenode.nodesize = thenode.get_rec_size()
###############################################################
# ls
###############################################################
def list(self, top: NodeTop,
path: str,
rec: bool = False,
fmt: str = 'native',
raw: bool = False) -> List[NodeAny]:
"""
list nodes for "ls"
@top: top node
@path: path to search for
@rec: recursive walk
@fmt: output format
@raw: print raw size
"""
self._debug(f'ls walking path: \"{path}\" from \"{top.get_name()}\"')
resolv = anytree.resolver.Resolver('name')
found = []
try:
if '*' in path or '?' in path:
# we need to handle glob
self._debug('glob ls...')
found = resolv.glob(top, path)
else:
# we have a canonical path
self._debug('get ls...')
foundone = resolv.get(top, path)
cast(NodeAny, foundone)
typcast_node(foundone)
if foundone and foundone.may_have_children():
# let's find its children as well
modpath = os.path.join(path, '*')
found = resolv.glob(top, modpath)
else:
found = [foundone]
if len(found) < 1:
# nothing found
self._debug('nothing found')
return []
if rec:
# print the entire tree
self.print_tree(found[0].parent, fmt=fmt, raw=raw)
return found
# sort found nodes
found = sorted(found, key=os_sort_keygen(self._sort))
# print all found nodes
if fmt == 'csv-with-header':
self.csv_printer.print_header()
for item in found:
if fmt == 'native':
self._print_node_native(item,
withpath=True,
withnbchildren=True,
raw=raw)
elif fmt.startswith('csv'):
self._print_node_csv(item, raw=raw)
elif fmt.startswith('fzf'):
self._to_fzf(item, fmt)
except anytree.resolver.ChildResolverError:
pass
return found
###############################################################
# du
###############################################################
def diskusage(self, top: NodeTop,
path: str,
raw: bool = False) -> List[NodeAny]:
"""disk usage"""
self._debug(f'du walking path: \"{path}\" from \"{top.get_name()}\"')
resolv = anytree.resolver.Resolver('name')
found: NodeAny
try:
# we have a canonical path
self._debug('get du...')
found = resolv.get(top, path)
if not found:
# nothing found
self._debug('nothing found')
return []
self._debug(f'du found: {found}')
self._print_node_du(found, raw=raw)
except anytree.resolver.ChildResolverError:
pass
return found
###############################################################
# tree creation
###############################################################
def _add_entry(self, name: str,
top: NodeTop,
resolv: Any) -> None:
"""add an entry to the tree"""
entries = name.rstrip(os.sep).split(os.sep)
if len(entries) == 1:
self.new_archive_node(name, top, top.get_name())
return
sub = os.sep.join(entries[:-1])
nodename = entries[-1]
try:
parent = resolv.get(top, sub)
parent = self.new_archive_node(nodename, parent, top.get_name())
except anytree.resolver.ChildResolverError:
self.new_archive_node(nodename, top, top.get_name())
def list_to_tree(self, parent: NodeAny, names: List[str]) -> None:
"""convert list of files to a tree"""
if not names:
return
resolv = anytree.resolver.Resolver('name')
for name in names:
name = name.rstrip(os.sep)
self._add_entry(name, parent, resolv)
###############################################################
# diverse
###############################################################
def _get_entire_tree(self, start: NodeAny,
dironly: bool = False) -> List[NodeAny]:
"""
get entire tree and sort it
"""
typcast_node(start)
rend = anytree.RenderTree(start)
thenodes = []
if dironly:
for _, _, thenode in rend:
typcast_node(thenode)
if thenode.type == nodes.TYPE_DIR:
thenodes.append(thenode)
else:
thenodes = [x for _, _, x in rend]
return sorted(thenodes, key=os_sort_keygen(self._sort))
def _sort_tree(self,
items: List[NodeAny]) -> List[NodeAny]:
"""sorting a list of items"""
return sorted(items, key=self._sort, reverse=self.sortsize)
def _sort(self, lst: NodeAny) -> Any:
"""sort a list"""
if self.sortsize:
return self._sort_size(lst)
return self._sort_fs(lst)
@staticmethod
def _sort_fs(node: NodeAny) -> str:
"""sort by name"""
# to sort by types then name
return str(node.name)
@staticmethod
def _sort_size(node: NodeAny) -> float:
"""sorting nodes by size"""
try:
if not node.nodesize:
return 0
return float(node.nodesize)
except AttributeError:
return 0
@staticmethod
def _get_hash(path: str) -> str:
"""return md5 hash of node"""
try:
return md5sum(path)
except CatcliException as exc:
Logger.err(str(exc))
return ''
def _debug(self, string: str) -> None:
"""print debug"""
if not self.debug:
return
Logger.debug(string)