You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
OnionIngestor/onioningestor/__init__.py

148 lines
5.3 KiB
Python

import sys
import time
import queue
import traceback
import threading
import collections
from queue import Queue, Empty
from itertools import islice
from . import config
from . import loghandler
from onioningestor.databases import StorageDispatcher, StorageThread, StorageSync
class Ingestor:
"""ThreatIngestor main work logic.
Handles reading the config file, calling sources, maintaining state, and
sending artifacts to operators.
"""
def __init__(self, args):
# Load logger
log = loghandler.LoggerHandler(args.logLevel)
self.logger = log.start_logging()
# Load config
self.config = config.Config(args.configFile, self.logger)
self.blacklist = self.config.blacklist()
# Create Queues
self.queue = self.config.monitorQueue()
# Get asynchronously o synchronously save
self.save_thread = self.config.save_thread()
# Track some statistics about artifacts in a summary object.
self.summary = collections.Counter()
# Threads
self.threads = []
try:
# Load Storage Engines - ElasticSearch, Telegram, Twitter etc
self.storage = StorageDispatcher(self.logger)
for name, db, kwargs in self.config.database_engines():
# start the threads handling database storage if needed
if self.save_thread:
self.logger.debug(f"Starting daemon thread for {str(db)}")
t = StorageThread(db(self.logger, **kwargs))
self.threads.append(t)
t.setDaemon(True)
t.start()
# save onions synchronously
else:
s = StorageSync(db(self.logger, **kwargs))
self.storage.add_storage(s)
if self.save_thread:
self.logger.info("Onions will be saved asynchronously")
else:
self.logger.info("Onions will be saved synchronously")
# Instantiate operator plugins.
self.logger.debug("initializing operators")
self.operators = {name: operator(self.logger, self.config.torController(), self.blacklist, **kwargs)
for name, operator, kwargs in self.config.operators()}
except Exception as e:
# Error loading starting plugins.
self.logger.error(e)
self.logger.debug(traceback.print_exc())
sys.exit(1)
def collect_sources(self):
self.logger.debug("Initializing sources")
for name, collect, kwargs in self.config.sources():
# Run the source to collect onion links from clear net.
self.logger.info(f"Running source '{name}'")
try:
# get the generator of onions
source = collect(self.logger, **kwargs)
source.set_onionQueue(self.queue) #priority 2
t = source.run()
self.threads.append(t)
#self.logger.info(f'Starting of thread: {t.currentThread().name}')
#t.start()
except Exception as e:
self.logger.error(e)
self.logger.error(traceback.print_exc())
continue
def process(self, onion):
for operator in self.operators:
self.logger.info(f"Processing found onions with operator '{operator}'")
# Set CrawlQueue for every operator
self.operators[operator].set_crawlQueue(self.queue)
# Process list of onions
self.operators[operator].process(onion)
def run(self):
"""Run once, or forever, depending on config."""
if self.config.daemon():
self.logger.info("Running forever, in a loop")
self.run_forever()
else:
self.logger.info("Running once, to completion")
self.run_once()
def run_once(self):
"""Run each source once, passing artifacts to each operator."""
# Start collecting sources
self.collect_sources()
# Sources will fill various queues
# MonitorQueue has priority high
# OnionQueue are those found in clearnet medium
# crawlQueue are those found crawling onionlinks low
done = False
while not done:
try:
onion = self.queue.get(True, 5)
## Process onions with each operator.
self.process(onion)
## Save Onions for each storage
self.storage.save_pastie(onion[1], 30)
except Empty:
self.logger.info('Queue is empty')
done = True
except Exception as e:
self.logger.error(e)
self.logger.error(traceback.print_exc())
break
except KeyboardInterrupt:
print('')
self.logger.info("Ctrl-c received! Sending kill to threads...")
for t in self.threads:
t.kill_received = True
self.logger.info('Exiting')
sys.exit(0)
def run_forever(self):
"""Run forever, sleeping for the configured interval between each run."""
while True:
self.run_once()
self.logger.debug(f"Sleeping for {self.config.sleep()} seconds")
time.sleep(self.config.sleep())