|
|
|
@ -4,7 +4,7 @@ import queue
|
|
|
|
|
import traceback
|
|
|
|
|
import threading
|
|
|
|
|
import collections
|
|
|
|
|
from queue import Queue
|
|
|
|
|
from queue import Queue, Empty
|
|
|
|
|
from itertools import islice
|
|
|
|
|
|
|
|
|
|
from . import config
|
|
|
|
@ -71,92 +71,77 @@ class Ingestor:
|
|
|
|
|
self.logger.debug(traceback.print_exc())
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
def iter_batches(self, data, batch_size):
|
|
|
|
|
data = iter(data)
|
|
|
|
|
while True:
|
|
|
|
|
batch = list(islice(data, batch_size))
|
|
|
|
|
if len(batch) == 0:
|
|
|
|
|
break
|
|
|
|
|
yield batch
|
|
|
|
|
|
|
|
|
|
def process(self, onions):
|
|
|
|
|
def collect_sources(self):
|
|
|
|
|
self.logger.debug("Initializing sources")
|
|
|
|
|
for name, collect, kwargs in self.config.sources():
|
|
|
|
|
# Run the source to collect onion links from clear net.
|
|
|
|
|
self.logger.info(f"Running source '{name}'")
|
|
|
|
|
try:
|
|
|
|
|
# get the generator of onions
|
|
|
|
|
source = collect(self.logger, **kwargs)
|
|
|
|
|
source.set_onionQueue(self.queue) #priority 2
|
|
|
|
|
t = source.run()
|
|
|
|
|
self.threads.append(t)
|
|
|
|
|
#self.logger.info(f'Starting of thread: {t.currentThread().name}')
|
|
|
|
|
#t.start()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
self.logger.error(e)
|
|
|
|
|
self.logger.error(traceback.print_exc())
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
def process(self, onion):
|
|
|
|
|
for operator in self.operators:
|
|
|
|
|
self.logger.info(f"Processing found onions with operator '{operator}'")
|
|
|
|
|
# Set CrawlQueue for every operator
|
|
|
|
|
self.operators[operator].set_crawlQueue(self.queue)
|
|
|
|
|
# Process list of onions
|
|
|
|
|
self.operators[operator].process(onions)
|
|
|
|
|
self.operators[operator].process(onion)
|
|
|
|
|
|
|
|
|
|
def run(self):
|
|
|
|
|
"""Run once, or forever, depending on config."""
|
|
|
|
|
self.run_once()
|
|
|
|
|
#if self.config.daemon():
|
|
|
|
|
# self.logger.info("Running forever, in a loop")
|
|
|
|
|
# self.run_forever()
|
|
|
|
|
#else:
|
|
|
|
|
# self.logger.info("Running once, to completion")
|
|
|
|
|
# self.run_once()
|
|
|
|
|
if self.config.daemon():
|
|
|
|
|
self.logger.info("Running forever, in a loop")
|
|
|
|
|
self.run_forever()
|
|
|
|
|
else:
|
|
|
|
|
self.logger.info("Running once, to completion")
|
|
|
|
|
self.run_once()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def run_once(self):
|
|
|
|
|
"""Run each source once, passing artifacts to each operator."""
|
|
|
|
|
# Start collecting sources
|
|
|
|
|
self.collect_sources()
|
|
|
|
|
# self.collect_sources()
|
|
|
|
|
# Sources will fill various queues
|
|
|
|
|
# MonitorQueue has priority high
|
|
|
|
|
# OnionQueue are those found in clearnet medium
|
|
|
|
|
# crawlQueue are those found crawling onionlinks low
|
|
|
|
|
onions = list(self.queue.queue)
|
|
|
|
|
done = False
|
|
|
|
|
if onions:
|
|
|
|
|
while not done:
|
|
|
|
|
try:
|
|
|
|
|
## Process onions with each operator.
|
|
|
|
|
for batched_onions in self.iter_batches(onions, batch_size=10):
|
|
|
|
|
self.process(batched_onions)
|
|
|
|
|
## Save Onions for each storage
|
|
|
|
|
for onion in batched_onions:
|
|
|
|
|
self.storage.save_pastie(onion[1], 30)
|
|
|
|
|
done = True
|
|
|
|
|
except Exception as e:
|
|
|
|
|
self.logger.error(e)
|
|
|
|
|
self.logger.error(traceback.print_exc())
|
|
|
|
|
break
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
print('')
|
|
|
|
|
self.logger.info("Ctrl-c received! Sending kill to threads...")
|
|
|
|
|
for t in self.threads:
|
|
|
|
|
t.kill_received = True
|
|
|
|
|
self.logger.info('Exiting')
|
|
|
|
|
sys.exit(0)
|
|
|
|
|
else:
|
|
|
|
|
for t in self.threads:
|
|
|
|
|
t.kill_received = True
|
|
|
|
|
self.logger.info(f"Sleeping for {self.config.sleep()} seconds")
|
|
|
|
|
time.sleep(self.config.sleep())
|
|
|
|
|
|
|
|
|
|
while not done:
|
|
|
|
|
try:
|
|
|
|
|
onion = self.queue.get(True, 5)
|
|
|
|
|
## Process onions with each operator.
|
|
|
|
|
self.process(onion)
|
|
|
|
|
## Save Onions for each storage
|
|
|
|
|
self.storage.save_pastie(onion[1], 30)
|
|
|
|
|
except Empty:
|
|
|
|
|
self.logger.info('Queue is empty')
|
|
|
|
|
done = True
|
|
|
|
|
except Exception as e:
|
|
|
|
|
self.logger.error(e)
|
|
|
|
|
self.logger.error(traceback.print_exc())
|
|
|
|
|
break
|
|
|
|
|
except KeyboardInterrupt:
|
|
|
|
|
print('')
|
|
|
|
|
self.logger.info("Ctrl-c received! Sending kill to threads...")
|
|
|
|
|
for t in self.threads:
|
|
|
|
|
t.kill_received = True
|
|
|
|
|
self.logger.info('Exiting')
|
|
|
|
|
sys.exit(0)
|
|
|
|
|
|
|
|
|
|
def run_forever(self):
|
|
|
|
|
"""Run forever, sleeping for the configured interval between each run."""
|
|
|
|
|
while True:
|
|
|
|
|
self.run_once()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def collect_sources(self):
|
|
|
|
|
self.logger.debug("Initializing sources")
|
|
|
|
|
for name, collect, kwargs in self.config.sources():
|
|
|
|
|
# Run the source to collect onion links from clear net.
|
|
|
|
|
self.logger.info(f"Running source '{name}'")
|
|
|
|
|
try:
|
|
|
|
|
# get the generator of onions
|
|
|
|
|
source = collect(self.logger, **kwargs)
|
|
|
|
|
source.set_onionQueue(self.queue) #priority 2
|
|
|
|
|
t = source.run()
|
|
|
|
|
self.threads.append(t)
|
|
|
|
|
#self.logger.info(f'Starting of thread: {t.currentThread().name}')
|
|
|
|
|
#t.start()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
self.logger.error(e)
|
|
|
|
|
self.logger.error(traceback.print_exc())
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
self.logger.debug(f"Sleeping for {self.config.sleep()} seconds")
|
|
|
|
|
time.sleep(self.config.sleep())
|
|
|
|
|