Revamp seedleech count collection
Add seedleech-daemon that takes care of keeping seedleech counts updatedpull/5/head
parent
2e386bd45a
commit
235bd9df8c
@ -0,0 +1,48 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/BurntSushi/toml"
|
||||
)
|
||||
|
||||
type Tomlconf struct {
|
||||
Trackers []string
|
||||
WaitTime string // time to wait between requests to one tracker
|
||||
LogInterval string // interval between stats dumps to console
|
||||
Categories map[string]string // Defines acceptable freshness of seed/leech counts for categories of torrents. Category number is the minimum seed count for torrent to be assigned to a category. Each torrent/tracker pair is fetched independently. All torrent/tracker pairs are in the highest category available to it.
|
||||
}
|
||||
|
||||
func loadConfig() config {
|
||||
var tomlconf Tomlconf //stores parsed config, ready for translation from strings to durations.
|
||||
if _, err := toml.DecodeFile("config.toml", &tomlconf); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
conf := config{trackers: tomlconf.Trackers}
|
||||
wt, err := time.ParseDuration(tomlconf.WaitTime)
|
||||
if err != nil {
|
||||
log.Println("f")
|
||||
log.Println(tomlconf)
|
||||
log.Fatal(err)
|
||||
}
|
||||
conf.waitTime = wt
|
||||
li, err := time.ParseDuration(tomlconf.LogInterval)
|
||||
if err != nil {
|
||||
log.Println(tomlconf.LogInterval)
|
||||
log.Fatal(err)
|
||||
}
|
||||
conf.logInterval = li
|
||||
conf.categories = make(map[int]time.Duration)
|
||||
for k, v := range tomlconf.Categories {
|
||||
ka, err := strconv.Atoi(k)
|
||||
conf.categories[ka], err = time.ParseDuration(v)
|
||||
if err != nil {
|
||||
log.Println(v)
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
return conf
|
||||
}
|
@ -0,0 +1,6 @@
|
||||
trackers = ["udp://tracker.coppersurfer.tk:6969", "udp://tracker.pirateparty.gr:6969", "udp://tracker.leechers-paradise.org:6969/announce", "udp://tracker.internetwarriors.net:1337/announce"]
|
||||
waitTime = "500ms"
|
||||
logInterval = "5m"
|
||||
|
||||
categories = { 0 = "1440h", 1 = "720h", 3 = "240h", 10 = "120h", 100 = "24h", 1000 = "12h", 2000 = "6h", 3000 = "3h"}
|
||||
|
@ -0,0 +1,216 @@
|
||||
// the seedleech daemon is designed to keep seed/leech counts as fresh as possible automatically and run 24/7
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand"
|
||||
"os"
|
||||
"os/signal"
|
||||
"strconv"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/etix/goscrape"
|
||||
"github.com/lib/pq"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
// Config file definition. Loaded from config.toml in workdir. All fields mandatory.
|
||||
type config struct {
|
||||
trackers []string
|
||||
waitTime time.Duration // time to wait between requests to one tracker
|
||||
logInterval time.Duration // interval between stats dumps to console
|
||||
categories map[int]time.Duration // Defines acceptable freshness of seed/leech counts for categories of torrents. Category number is the minimum seed count for torrent to be assigned to a category. Each torrent/tracker pair is fetched independently. All torrent/tracker pairs are in the highest category available to it.
|
||||
}
|
||||
|
||||
type trackerResponse struct {
|
||||
tracker string
|
||||
scrapeResult []*goscrape.ScrapeResult
|
||||
}
|
||||
|
||||
var conf config
|
||||
|
||||
func main() {
|
||||
conf = loadConfig()
|
||||
db := initDb()
|
||||
|
||||
trackerResponses := make(chan trackerResponse, 100)
|
||||
trackerRequests := make(map[string]chan []string)
|
||||
|
||||
sigterm := make(chan os.Signal)
|
||||
signal.Notify(sigterm, os.Interrupt, syscall.SIGTERM)
|
||||
quit := false
|
||||
go func() {
|
||||
<-sigterm
|
||||
quit = true
|
||||
}()
|
||||
|
||||
for _, tracker := range conf.trackers {
|
||||
trackerRequests[tracker] = make(chan []string) // non buffered (!)
|
||||
go runScraper(trackerRequests[tracker], trackerResponses, tracker)
|
||||
for minseed, delay := range conf.categories {
|
||||
go runWorkFetcher(trackerRequests[tracker], tracker, minseed, delay, &quit, db)
|
||||
}
|
||||
}
|
||||
|
||||
go runPersister(trackerResponses, db)
|
||||
|
||||
for {
|
||||
time.Sleep(conf.logInterval / 2)
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
time.Sleep(conf.logInterval / 2)
|
||||
if quit {
|
||||
return
|
||||
}
|
||||
for _, tracker := range conf.trackers {
|
||||
for minSeed, maxAge := range conf.categories {
|
||||
freshlimit := time.Now().Local().Add(-maxAge)
|
||||
if minSeed != 0 {
|
||||
var res int
|
||||
row := db.QueryRow("SELECT count(1) FROM trackerdata WHERE tracker = $1 AND seeders > $2 AND scraped < $3)", tracker, minSeed, freshlimit)
|
||||
row.Scan(&res)
|
||||
if res > 0 {
|
||||
fmt.Println("Tracker " + tracker + ", seeds > " + strconv.Itoa(minSeed) + ": " + strconv.Itoa(res))
|
||||
}
|
||||
} else {
|
||||
var res int
|
||||
row := db.QueryRow("SELECT count(1) from torrent")
|
||||
row.Scan(&res)
|
||||
totalTorrents := res
|
||||
row = db.QueryRow("SELECT count(1) from trackerdata where tracker = $1", tracker)
|
||||
row.Scan(&res)
|
||||
if (totalTorrents - res) > 0 {
|
||||
fmt.Println("Tracker " + tracker + ", seeds = 0: " + strconv.Itoa(totalTorrents-res))
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// a work fetcher for a given tracker and category combination
|
||||
func runWorkFetcher(trackerRequests chan []string, tracker string, minseed int, maxAge time.Duration, quit *bool, db *sql.DB) {
|
||||
for {
|
||||
if *quit {
|
||||
fmt.Println("Workfetcher for category " + strconv.Itoa(minseed) + ", tracker " + tracker + " stopping.")
|
||||
return
|
||||
}
|
||||
freshlimit := time.Now().Local().Add(-maxAge)
|
||||
var rows *sql.Rows
|
||||
var err error
|
||||
if minseed != 0 {
|
||||
rows, err = db.Query("SELECT infohash FROM trackerdata WHERE tracker = $1 AND seeders > $2 AND scraped < $3 LIMIT 630", tracker, minseed, freshlimit)
|
||||
} else {
|
||||
time.Sleep(time.Duration(int64(rand.Intn(6000)) * int64(time.Second))) //sleep for random time between 100 mins and 0
|
||||
rows, err = db.Query("SELECT infohash FROM torrent WHERE NOT EXISTS (SELECT from trackerdata WHERE infohash = torrent.infohash AND tracker = $1 AND scraped > $2) LIMIT 6300", tracker, freshlimit)
|
||||
}
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer rows.Close()
|
||||
var infohashes []string
|
||||
for rows.Next() {
|
||||
var infohash string
|
||||
if err := rows.Scan(&infohash); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if len(infohashes) < 74 {
|
||||
infohashes = append(infohashes, infohash)
|
||||
} else {
|
||||
if *quit {
|
||||
fmt.Println("Workfetcher for category " + strconv.Itoa(minseed) + ", tracker " + tracker + " stopping.")
|
||||
return
|
||||
}
|
||||
trackerRequests <- infohashes
|
||||
infohashes = []string{}
|
||||
}
|
||||
}
|
||||
trackerRequests <- infohashes
|
||||
time.Sleep(time.Minute)
|
||||
}
|
||||
}
|
||||
|
||||
// a scraper for one tracker
|
||||
func runScraper(trackerRequests chan []string, trackerResponses chan trackerResponse, tracker string) {
|
||||
s, err := goscrape.New(tracker)
|
||||
s.SetTimeout(conf.waitTime)
|
||||
s.SetRetryLimit(1)
|
||||
if err != nil {
|
||||
log.Fatal("Error:", err)
|
||||
}
|
||||
for req := range trackerRequests {
|
||||
infohashes := make([][]byte, len(req))
|
||||
for i, v := range req {
|
||||
if len(v) != 40 { //infohashes are 40 chars long in string representation.
|
||||
panic("Infohash in trackerRequest with index " + strconv.Itoa(i) + " isn't 40 chars long, it's " + strconv.Itoa(len(v)) + " long.")
|
||||
}
|
||||
infohashes[i] = []byte(v)
|
||||
}
|
||||
res, err := s.Scrape(infohashes...)
|
||||
if err != nil {
|
||||
log.Println(tracker)
|
||||
log.Println(err)
|
||||
} else {
|
||||
trackerResponses <- trackerResponse{tracker, res}
|
||||
}
|
||||
|
||||
time.Sleep(conf.waitTime)
|
||||
}
|
||||
}
|
||||
|
||||
func runPersister(trackerResponses chan trackerResponse, db *sql.DB) {
|
||||
for res := range trackerResponses {
|
||||
for _, scrapeResult := range res.scrapeResult {
|
||||
// TODO check if trackerdata for torrent/tracker combo aren't in DB already, if no, insert, if yes, update
|
||||
timestamp := time.Now()
|
||||
_, err := db.Exec("INSERT INTO trackerdata (infohash, tracker, seeders, leechers, completed, scraped) VALUES ($1, $2, $3, $4, $5, $6)", scrapeResult.Infohash, res.tracker, scrapeResult.Seeders, scrapeResult.Leechers, scrapeResult.Completed, timestamp)
|
||||
if pgerr, ok := err.(*pq.Error); ok {
|
||||
if pgerr.Code == "23505" {
|
||||
//handle duplicate insert
|
||||
} else {
|
||||
log.Fatal(err)
|
||||
_, err := db.Exec("UPDATE trackerdata SET seeders = $3, leechers = $4, completed = $5, scraped = $6 WHERE infohash = $1 AND trakcer = $2", scrapeResult.Infohash, res.tracker, scrapeResult.Seeders, scrapeResult.Leechers, scrapeResult.Completed, timestamp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func initDb() *sql.DB {
|
||||
connStr := "user=nextgen dbname=nextgen host=/var/run/postgresql"
|
||||
db, err := sql.Open("postgres", connStr)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = db.Exec(`DO $$
|
||||
BEGIN
|
||||
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'tracker') THEN
|
||||
CREATE TYPE tracker AS ENUM ('udp://tracker.coppersurfer.tk:6969', 'udp://exodus.desync.com:6969', 'udp://tracker.pirateparty.gr:6969');
|
||||
END IF;
|
||||
END$$`)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS trackerdata (
|
||||
infohash char(40),
|
||||
tracker tracker,
|
||||
seeders int NOT NULL,
|
||||
leechers int NOT NULL,
|
||||
completed int NOT NULL,
|
||||
scraped timestamp,
|
||||
PRIMARY KEY (infohash, tracker)
|
||||
)`)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
return db
|
||||
}
|
@ -0,0 +1,12 @@
|
||||
[Unit]
|
||||
Description=nextgen seedleech daemon
|
||||
Requires=postgresql
|
||||
|
||||
[Service]
|
||||
User=nextgen
|
||||
WorkingDirectory=/home/nextgen
|
||||
ExecStart=/home/nextgen/seedleech-daemon
|
||||
Restart=always
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
@ -1,17 +1,17 @@
|
||||
-- SELECT for the index-ready materialized view "fresh"
|
||||
select distinct on (infohash) trackerdata.infohash, torrent.name, torrent.length, trackerdata.seeders, trackerdata.leechers, trackerdata.completed, tracker from trackerdata inner join torrent on (trackerdata.infohash = torrent.infohash) where torrent.copyrighted != 't' order by infohash, scraped asc, seeders desc;
|
||||
-- to create this view, run: create materialized view AS (paste from above)
|
||||
|
||||
-- see what trackers are the most used
|
||||
select sum(seeders),tracker from trackerdata group by tracker;
|
||||
|
||||
get highest seed/leech count found for every torrent, using data from tracker with most seeds
|
||||
select distinct on (infohash) infohash, seeders, leechers, completed, tracker from trackerdata where completed != 0 order by infohash, scraped asc, seeders desc;
|
||||
|
||||
|
||||
|
||||
generate top 100 by seeders:
|
||||
select * from (select distinct on (trackerdata.infohash) trackerdata.infohash, torrent.name, seeders, leechers from trackerdata inner join torrent on (trackerdata.infohash = torrent.infohash) where completed != 0 order by infohash, seeders desc) as subquery order by seeders desc limit 100;
|
||||
-- generate top 100 by seeders:
|
||||
SELECT torrent.name, fresh.* from fresh INNER JOIN torrent ON torrent.infohash = fresh.infohash ORDER BY s desc limit 100;
|
||||
|
||||
SELECT added::date, count(infohash)
|
||||
from torrent where added > '2019-01-15'::date
|
||||
group by added::date order by count desc;
|
||||
group by added::date order by count desc;
|
||||
|
||||
CREATE MATERIALIZED VIEW fresh AS
|
||||
SELECT infohash,
|
||||
max(seeders) AS s,
|
||||
max(leechers) AS l,
|
||||
max(completed) AS c
|
||||
FROM trackerdata
|
||||
GROUP BY infohash;
|
@ -1,3 +0,0 @@
|
||||
go build
|
||||
scp ./tracker-scraper user@server:/home/nextgen/
|
||||
ssh user@server sudo -u nextgen /home/nextgen/tracker-scraper
|
@ -1,166 +0,0 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"database/sql"
|
||||
"log"
|
||||
"strconv"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/etix/goscrape"
|
||||
_ "github.com/lib/pq"
|
||||
)
|
||||
|
||||
const retryLimit = 3
|
||||
const waitTime = 250 // in ms
|
||||
var trackers = [3]string{"udp://tracker.coppersurfer.tk:6969", "udp://exodus.desync.com:6969", "udp://tracker.pirateparty.gr:6969"}
|
||||
|
||||
func main() {
|
||||
db := initDb()
|
||||
trackerResponses := make(chan trackerResponse, 100)
|
||||
trackerRequests := make(map[string]chan []string)
|
||||
|
||||
var counter uint64 //count of torrents scraped
|
||||
|
||||
quitCounter := make(chan bool)
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-quitCounter:
|
||||
return
|
||||
default:
|
||||
log.Println("Torrents scraped so far: " + strconv.Itoa(int(atomic.LoadUint64(&counter))))
|
||||
time.Sleep(2 * time.Second)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for _, tracker := range trackers {
|
||||
trackerRequests[tracker] = make(chan []string, 1000)
|
||||
go runScraper(trackerRequests[tracker], trackerResponses, tracker, waitTime, &counter)
|
||||
}
|
||||
|
||||
datestamp := time.Now().Local().Format("2006-01-02")
|
||||
|
||||
go runPersister(trackerResponses, db, datestamp)
|
||||
|
||||
rows, err := db.Query("SELECT infohash FROM torrent WHERE NOT EXISTS (SELECT FROM trackerdata WHERE infohash = torrent.infohash AND scraped = '" + datestamp + "')")
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
defer rows.Close()
|
||||
var infohashes []string
|
||||
for rows.Next() {
|
||||
var infohash string
|
||||
if err := rows.Scan(&infohash); err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
if len(infohashes) < 74 {
|
||||
infohashes = append(infohashes, infohash)
|
||||
} else {
|
||||
for _, tracker := range trackers {
|
||||
trackerRequests[tracker] <- infohashes
|
||||
}
|
||||
infohashes = []string{}
|
||||
}
|
||||
}
|
||||
for _, tracker := range trackers {
|
||||
trackerRequests[tracker] <- infohashes
|
||||
}
|
||||
|
||||
quitCounter <- true
|
||||
|
||||
for len(trackerRequests) > 0 {
|
||||
time.Sleep(2 * time.Second)
|
||||
var left int
|
||||
for i, tracker := range trackers {
|
||||
left = len(trackerRequests[tracker])
|
||||
if left != 0 {
|
||||
log.Println("Tracker " + strconv.Itoa(i) + " requests left to send: " + strconv.Itoa(len(trackerRequests[tracker])))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
for _, tracker := range trackers {
|
||||
close(trackerRequests[tracker])
|
||||
}
|
||||
|
||||
for len(trackerResponses) > 0 {
|
||||
time.Sleep(2 * time.Second)
|
||||
log.Println("Tracker responses left to save: " + strconv.Itoa(len(trackerResponses)))
|
||||
}
|
||||
time.Sleep(time.Duration(waitTime*retryLimit) * time.Millisecond)
|
||||
close(trackerResponses)
|
||||
}
|
||||
|
||||
//Runs a tracker that scrapes the given tracker. Takes requests from trackerRequests and sends responses to trackerResponses
|
||||
//waittime is in miliseconds
|
||||
func runScraper(trackerRequests chan []string, trackerResponses chan trackerResponse, tracker string, waittime int, counter *uint64) {
|
||||
s, err := goscrape.New(tracker)
|
||||
s.SetTimeout(time.Duration(waitTime) * time.Millisecond)
|
||||
s.SetRetryLimit(retryLimit)
|
||||
if err != nil {
|
||||
log.Fatal("Error:", err)
|
||||
}
|
||||
for req := range trackerRequests {
|
||||
infohashes := make([][]byte, len(req))
|
||||
for i, v := range req {
|
||||
if len(v) != 40 { //infohashes are 40 chars long in string representation.
|
||||
panic("Infohash in trackerRequest with index " + strconv.Itoa(i) + " isn't 40 chars long, it's " + strconv.Itoa(len(v)) + " long.")
|
||||
}
|
||||
infohashes[i] = []byte(v)
|
||||
}
|
||||
res, err := s.Scrape(infohashes...)
|
||||
if err != nil {
|
||||
log.Println(err)
|
||||
} else {
|
||||
atomic.AddUint64(counter, uint64(len(infohashes)))
|
||||
trackerResponses <- trackerResponse{tracker, res}
|
||||
}
|
||||
|
||||
time.Sleep(time.Duration(waittime) * time.Millisecond)
|
||||
}
|
||||
}
|
||||
|
||||
func runPersister(trackerResponses chan trackerResponse, db *sql.DB, datestamp string) {
|
||||
for res := range trackerResponses {
|
||||
for _, scrapeResult := range res.scrapeResult {
|
||||
_, err := db.Exec("INSERT INTO trackerdata (infohash, tracker, seeders, leechers, completed, scraped) VALUES ($1, $2, $3, $4, $5, $6)", scrapeResult.Infohash, res.tracker, scrapeResult.Seeders, scrapeResult.Leechers, scrapeResult.Completed, datestamp)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func initDb() *sql.DB {
|
||||
connStr := "user=nextgen dbname=nextgen host=/var/run/postgresql"
|
||||
db, err := sql.Open("postgres", connStr)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
|
||||
/*_, err = db.Exec(`CREATE TYPE tracker AS ENUM ('udp://tracker.coppersurfer.tk:6969', 'udp://exodus.desync.com:6969', 'udp://tracker.pirateparty.gr:6969')`)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}*/
|
||||
|
||||
_, err = db.Exec(`CREATE TABLE IF NOT EXISTS trackerdata (
|
||||
infohash char(40),
|
||||
tracker tracker,
|
||||
seeders int NOT NULL,
|
||||
leechers int NOT NULL,
|
||||
completed int NOT NULL,
|
||||
scraped char(10),
|
||||
PRIMARY KEY (infohash, scraped, tracker)
|
||||
)`)
|
||||
if err != nil {
|
||||
log.Fatal(err)
|
||||
}
|
||||
return db
|
||||
}
|
||||
|
||||
type trackerResponse struct {
|
||||
tracker string
|
||||
scrapeResult []*goscrape.ScrapeResult
|
||||
}
|
@ -1,19 +1,15 @@
|
||||
# This script updates the index. Testing and uploading to server/IPFS is done manually.
|
||||
|
||||
echo "Scraping trackers for seed/leech data"
|
||||
mosh nextgen@server "~/tracker-scraper" # you can use ssh instead of mosh aswell
|
||||
|
||||
echo "Refreshing database"
|
||||
ssh nextgen@server "psql -c 'REFRESH MATERIALIZED VIEW fresh'"
|
||||
|
||||
echo "Generating index dump"
|
||||
echo "Downloading dump"
|
||||
rm index-generator/dump.csv
|
||||
ssh nextgen@server "psql -c '\copy (select * from fresh) to stdout with (format csv)'" > index-generator/dump.csv
|
||||
|
||||
ssh nextgen@server "psql -c '\copy (select fresh.infohash, torrent.name, torrent.length, fresh.s, fresh.l, fresh.c from fresh inner join torrent on torrent.infohash = fresh.infohash) to stdout with (format csv)'" > index-generator/dump.csv
|
||||
echo "Generating index"
|
||||
(cd index-generator; node --max-old-space-size=10000 main.js)
|
||||
python3 index-generator/fix-metajson.py website/generated/inx
|
||||
|
||||
echo "Generating top torrents list"
|
||||
generate-top-torrents/generate-top-torrents > website/generated/top.json
|
||||
|
||||
echo "Uploading website"
|
||||
cd website
|
||||
rsync -ar ./ root@server:/www/torrent-paradise.ml # consider using --progress
|
Loading…
Reference in New Issue