diff --git a/README.md b/README.md index 0d4c93c..05db47d 100644 --- a/README.md +++ b/README.md @@ -34,7 +34,7 @@ What I did first after getting the server up and running was importing the TPB d ## Generate the index -See `update-index.sh`. Before running it for the first time, you should create the materialized view *fresh*. For instructions, see the first paragraph of snippets.sql. +See `update-index.sh`. ## Spider the DHT @@ -42,9 +42,7 @@ Run `go build` in spider/ to compile and scp the binary it to the server. You ca ## Scraping trackers for seed/leech data -Run `go build` in tracker-scraper/ to compile and scp the binary it to the server. Run it every time you want to fetch new seed/leech data for all torrents. - -`tracker-scraper` saves the results into the *trackerdata* table in database. +Run `go build` in seedleech-daemon/ to compile and scp the binary it to the server. You can use the systemd service file in `seedleech-daemon/seedleech.service`. # Contributing diff --git a/seedleech-daemon/config.go b/seedleech-daemon/config.go new file mode 100644 index 0000000..6750016 --- /dev/null +++ b/seedleech-daemon/config.go @@ -0,0 +1,48 @@ +package main + +import ( + "log" + "strconv" + "time" + + "github.com/BurntSushi/toml" +) + +type Tomlconf struct { + Trackers []string + WaitTime string // time to wait between requests to one tracker + LogInterval string // interval between stats dumps to console + Categories map[string]string // Defines acceptable freshness of seed/leech counts for categories of torrents. Category number is the minimum seed count for torrent to be assigned to a category. Each torrent/tracker pair is fetched independently. All torrent/tracker pairs are in the highest category available to it. +} + +func loadConfig() config { + var tomlconf Tomlconf //stores parsed config, ready for translation from strings to durations. + if _, err := toml.DecodeFile("config.toml", &tomlconf); err != nil { + log.Fatal(err) + } + + conf := config{trackers: tomlconf.Trackers} + wt, err := time.ParseDuration(tomlconf.WaitTime) + if err != nil { + log.Println("f") + log.Println(tomlconf) + log.Fatal(err) + } + conf.waitTime = wt + li, err := time.ParseDuration(tomlconf.LogInterval) + if err != nil { + log.Println(tomlconf.LogInterval) + log.Fatal(err) + } + conf.logInterval = li + conf.categories = make(map[int]time.Duration) + for k, v := range tomlconf.Categories { + ka, err := strconv.Atoi(k) + conf.categories[ka], err = time.ParseDuration(v) + if err != nil { + log.Println(v) + log.Fatal(err) + } + } + return conf +} diff --git a/seedleech-daemon/config.toml b/seedleech-daemon/config.toml new file mode 100644 index 0000000..18fff75 --- /dev/null +++ b/seedleech-daemon/config.toml @@ -0,0 +1,6 @@ +trackers = ["udp://tracker.coppersurfer.tk:6969", "udp://tracker.pirateparty.gr:6969", "udp://tracker.leechers-paradise.org:6969/announce", "udp://tracker.internetwarriors.net:1337/announce"] +waitTime = "500ms" +logInterval = "5m" + +categories = { 0 = "1440h", 1 = "720h", 3 = "240h", 10 = "120h", 100 = "24h", 1000 = "12h", 2000 = "6h", 3000 = "3h"} + \ No newline at end of file diff --git a/seedleech-daemon/seedleech-daemon.go b/seedleech-daemon/seedleech-daemon.go new file mode 100644 index 0000000..a14f394 --- /dev/null +++ b/seedleech-daemon/seedleech-daemon.go @@ -0,0 +1,216 @@ +// the seedleech daemon is designed to keep seed/leech counts as fresh as possible automatically and run 24/7 +package main + +import ( + "database/sql" + "fmt" + "log" + "math/rand" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + "github.com/etix/goscrape" + "github.com/lib/pq" + _ "github.com/lib/pq" +) + +// Config file definition. Loaded from config.toml in workdir. All fields mandatory. +type config struct { + trackers []string + waitTime time.Duration // time to wait between requests to one tracker + logInterval time.Duration // interval between stats dumps to console + categories map[int]time.Duration // Defines acceptable freshness of seed/leech counts for categories of torrents. Category number is the minimum seed count for torrent to be assigned to a category. Each torrent/tracker pair is fetched independently. All torrent/tracker pairs are in the highest category available to it. +} + +type trackerResponse struct { + tracker string + scrapeResult []*goscrape.ScrapeResult +} + +var conf config + +func main() { + conf = loadConfig() + db := initDb() + + trackerResponses := make(chan trackerResponse, 100) + trackerRequests := make(map[string]chan []string) + + sigterm := make(chan os.Signal) + signal.Notify(sigterm, os.Interrupt, syscall.SIGTERM) + quit := false + go func() { + <-sigterm + quit = true + }() + + for _, tracker := range conf.trackers { + trackerRequests[tracker] = make(chan []string) // non buffered (!) + go runScraper(trackerRequests[tracker], trackerResponses, tracker) + for minseed, delay := range conf.categories { + go runWorkFetcher(trackerRequests[tracker], tracker, minseed, delay, &quit, db) + } + } + + go runPersister(trackerResponses, db) + + for { + time.Sleep(conf.logInterval / 2) + if quit { + return + } + time.Sleep(conf.logInterval / 2) + if quit { + return + } + for _, tracker := range conf.trackers { + for minSeed, maxAge := range conf.categories { + freshlimit := time.Now().Local().Add(-maxAge) + if minSeed != 0 { + var res int + row := db.QueryRow("SELECT count(1) FROM trackerdata WHERE tracker = $1 AND seeders > $2 AND scraped < $3)", tracker, minSeed, freshlimit) + row.Scan(&res) + if res > 0 { + fmt.Println("Tracker " + tracker + ", seeds > " + strconv.Itoa(minSeed) + ": " + strconv.Itoa(res)) + } + } else { + var res int + row := db.QueryRow("SELECT count(1) from torrent") + row.Scan(&res) + totalTorrents := res + row = db.QueryRow("SELECT count(1) from trackerdata where tracker = $1", tracker) + row.Scan(&res) + if (totalTorrents - res) > 0 { + fmt.Println("Tracker " + tracker + ", seeds = 0: " + strconv.Itoa(totalTorrents-res)) + } + } + } + } + } +} + +// a work fetcher for a given tracker and category combination +func runWorkFetcher(trackerRequests chan []string, tracker string, minseed int, maxAge time.Duration, quit *bool, db *sql.DB) { + for { + if *quit { + fmt.Println("Workfetcher for category " + strconv.Itoa(minseed) + ", tracker " + tracker + " stopping.") + return + } + freshlimit := time.Now().Local().Add(-maxAge) + var rows *sql.Rows + var err error + if minseed != 0 { + rows, err = db.Query("SELECT infohash FROM trackerdata WHERE tracker = $1 AND seeders > $2 AND scraped < $3 LIMIT 630", tracker, minseed, freshlimit) + } else { + time.Sleep(time.Duration(int64(rand.Intn(6000)) * int64(time.Second))) //sleep for random time between 100 mins and 0 + rows, err = db.Query("SELECT infohash FROM torrent WHERE NOT EXISTS (SELECT from trackerdata WHERE infohash = torrent.infohash AND tracker = $1 AND scraped > $2) LIMIT 6300", tracker, freshlimit) + } + if err != nil { + log.Fatal(err) + } + defer rows.Close() + var infohashes []string + for rows.Next() { + var infohash string + if err := rows.Scan(&infohash); err != nil { + log.Fatal(err) + } + if len(infohashes) < 74 { + infohashes = append(infohashes, infohash) + } else { + if *quit { + fmt.Println("Workfetcher for category " + strconv.Itoa(minseed) + ", tracker " + tracker + " stopping.") + return + } + trackerRequests <- infohashes + infohashes = []string{} + } + } + trackerRequests <- infohashes + time.Sleep(time.Minute) + } +} + +// a scraper for one tracker +func runScraper(trackerRequests chan []string, trackerResponses chan trackerResponse, tracker string) { + s, err := goscrape.New(tracker) + s.SetTimeout(conf.waitTime) + s.SetRetryLimit(1) + if err != nil { + log.Fatal("Error:", err) + } + for req := range trackerRequests { + infohashes := make([][]byte, len(req)) + for i, v := range req { + if len(v) != 40 { //infohashes are 40 chars long in string representation. + panic("Infohash in trackerRequest with index " + strconv.Itoa(i) + " isn't 40 chars long, it's " + strconv.Itoa(len(v)) + " long.") + } + infohashes[i] = []byte(v) + } + res, err := s.Scrape(infohashes...) + if err != nil { + log.Println(tracker) + log.Println(err) + } else { + trackerResponses <- trackerResponse{tracker, res} + } + + time.Sleep(conf.waitTime) + } +} + +func runPersister(trackerResponses chan trackerResponse, db *sql.DB) { + for res := range trackerResponses { + for _, scrapeResult := range res.scrapeResult { + // TODO check if trackerdata for torrent/tracker combo aren't in DB already, if no, insert, if yes, update + timestamp := time.Now() + _, err := db.Exec("INSERT INTO trackerdata (infohash, tracker, seeders, leechers, completed, scraped) VALUES ($1, $2, $3, $4, $5, $6)", scrapeResult.Infohash, res.tracker, scrapeResult.Seeders, scrapeResult.Leechers, scrapeResult.Completed, timestamp) + if pgerr, ok := err.(*pq.Error); ok { + if pgerr.Code == "23505" { + //handle duplicate insert + } else { + log.Fatal(err) + _, err := db.Exec("UPDATE trackerdata SET seeders = $3, leechers = $4, completed = $5, scraped = $6 WHERE infohash = $1 AND trakcer = $2", scrapeResult.Infohash, res.tracker, scrapeResult.Seeders, scrapeResult.Leechers, scrapeResult.Completed, timestamp) + if err != nil { + log.Fatal(err) + } + } + } + } + } +} + +func initDb() *sql.DB { + connStr := "user=nextgen dbname=nextgen host=/var/run/postgresql" + db, err := sql.Open("postgres", connStr) + if err != nil { + log.Fatal(err) + } + + _, err = db.Exec(`DO $$ +BEGIN + IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = 'tracker') THEN + CREATE TYPE tracker AS ENUM ('udp://tracker.coppersurfer.tk:6969', 'udp://exodus.desync.com:6969', 'udp://tracker.pirateparty.gr:6969'); + END IF; +END$$`) + if err != nil { + log.Fatal(err) + } + + _, err = db.Exec(`CREATE TABLE IF NOT EXISTS trackerdata ( + infohash char(40), + tracker tracker, + seeders int NOT NULL, + leechers int NOT NULL, + completed int NOT NULL, + scraped timestamp, + PRIMARY KEY (infohash, tracker) + )`) + if err != nil { + log.Fatal(err) + } + return db +} diff --git a/seedleech-daemon/seedleech.service b/seedleech-daemon/seedleech.service new file mode 100644 index 0000000..a061b72 --- /dev/null +++ b/seedleech-daemon/seedleech.service @@ -0,0 +1,12 @@ +[Unit] +Description=nextgen seedleech daemon +Requires=postgresql + +[Service] +User=nextgen +WorkingDirectory=/home/nextgen +ExecStart=/home/nextgen/seedleech-daemon +Restart=always + +[Install] +WantedBy=multi-user.target diff --git a/snippets.sql b/snippets.sql index 9c65114..9cfb1a5 100644 --- a/snippets.sql +++ b/snippets.sql @@ -1,17 +1,17 @@ --- SELECT for the index-ready materialized view "fresh" -select distinct on (infohash) trackerdata.infohash, torrent.name, torrent.length, trackerdata.seeders, trackerdata.leechers, trackerdata.completed, tracker from trackerdata inner join torrent on (trackerdata.infohash = torrent.infohash) where torrent.copyrighted != 't' order by infohash, scraped asc, seeders desc; --- to create this view, run: create materialized view AS (paste from above) - +-- see what trackers are the most used select sum(seeders),tracker from trackerdata group by tracker; -get highest seed/leech count found for every torrent, using data from tracker with most seeds -select distinct on (infohash) infohash, seeders, leechers, completed, tracker from trackerdata where completed != 0 order by infohash, scraped asc, seeders desc; - - - -generate top 100 by seeders: -select * from (select distinct on (trackerdata.infohash) trackerdata.infohash, torrent.name, seeders, leechers from trackerdata inner join torrent on (trackerdata.infohash = torrent.infohash) where completed != 0 order by infohash, seeders desc) as subquery order by seeders desc limit 100; +-- generate top 100 by seeders: +SELECT torrent.name, fresh.* from fresh INNER JOIN torrent ON torrent.infohash = fresh.infohash ORDER BY s desc limit 100; SELECT added::date, count(infohash) from torrent where added > '2019-01-15'::date -group by added::date order by count desc; \ No newline at end of file +group by added::date order by count desc; + +CREATE MATERIALIZED VIEW fresh AS + SELECT infohash, + max(seeders) AS s, + max(leechers) AS l, + max(completed) AS c + FROM trackerdata + GROUP BY infohash; \ No newline at end of file diff --git a/tracker-scraper/deploy.sh b/tracker-scraper/deploy.sh deleted file mode 100755 index b4b2c44..0000000 --- a/tracker-scraper/deploy.sh +++ /dev/null @@ -1,3 +0,0 @@ -go build -scp ./tracker-scraper user@server:/home/nextgen/ -ssh user@server sudo -u nextgen /home/nextgen/tracker-scraper \ No newline at end of file diff --git a/tracker-scraper/tracker-scraper.go b/tracker-scraper/tracker-scraper.go deleted file mode 100644 index eb98bc8..0000000 --- a/tracker-scraper/tracker-scraper.go +++ /dev/null @@ -1,166 +0,0 @@ -package main - -import ( - "database/sql" - "log" - "strconv" - "sync/atomic" - "time" - - "github.com/etix/goscrape" - _ "github.com/lib/pq" -) - -const retryLimit = 3 -const waitTime = 250 // in ms -var trackers = [3]string{"udp://tracker.coppersurfer.tk:6969", "udp://exodus.desync.com:6969", "udp://tracker.pirateparty.gr:6969"} - -func main() { - db := initDb() - trackerResponses := make(chan trackerResponse, 100) - trackerRequests := make(map[string]chan []string) - - var counter uint64 //count of torrents scraped - - quitCounter := make(chan bool) - go func() { - for { - select { - case <-quitCounter: - return - default: - log.Println("Torrents scraped so far: " + strconv.Itoa(int(atomic.LoadUint64(&counter)))) - time.Sleep(2 * time.Second) - } - } - }() - - for _, tracker := range trackers { - trackerRequests[tracker] = make(chan []string, 1000) - go runScraper(trackerRequests[tracker], trackerResponses, tracker, waitTime, &counter) - } - - datestamp := time.Now().Local().Format("2006-01-02") - - go runPersister(trackerResponses, db, datestamp) - - rows, err := db.Query("SELECT infohash FROM torrent WHERE NOT EXISTS (SELECT FROM trackerdata WHERE infohash = torrent.infohash AND scraped = '" + datestamp + "')") - if err != nil { - log.Fatal(err) - } - defer rows.Close() - var infohashes []string - for rows.Next() { - var infohash string - if err := rows.Scan(&infohash); err != nil { - log.Fatal(err) - } - if len(infohashes) < 74 { - infohashes = append(infohashes, infohash) - } else { - for _, tracker := range trackers { - trackerRequests[tracker] <- infohashes - } - infohashes = []string{} - } - } - for _, tracker := range trackers { - trackerRequests[tracker] <- infohashes - } - - quitCounter <- true - - for len(trackerRequests) > 0 { - time.Sleep(2 * time.Second) - var left int - for i, tracker := range trackers { - left = len(trackerRequests[tracker]) - if left != 0 { - log.Println("Tracker " + strconv.Itoa(i) + " requests left to send: " + strconv.Itoa(len(trackerRequests[tracker]))) - } - } - } - - for _, tracker := range trackers { - close(trackerRequests[tracker]) - } - - for len(trackerResponses) > 0 { - time.Sleep(2 * time.Second) - log.Println("Tracker responses left to save: " + strconv.Itoa(len(trackerResponses))) - } - time.Sleep(time.Duration(waitTime*retryLimit) * time.Millisecond) - close(trackerResponses) -} - -//Runs a tracker that scrapes the given tracker. Takes requests from trackerRequests and sends responses to trackerResponses -//waittime is in miliseconds -func runScraper(trackerRequests chan []string, trackerResponses chan trackerResponse, tracker string, waittime int, counter *uint64) { - s, err := goscrape.New(tracker) - s.SetTimeout(time.Duration(waitTime) * time.Millisecond) - s.SetRetryLimit(retryLimit) - if err != nil { - log.Fatal("Error:", err) - } - for req := range trackerRequests { - infohashes := make([][]byte, len(req)) - for i, v := range req { - if len(v) != 40 { //infohashes are 40 chars long in string representation. - panic("Infohash in trackerRequest with index " + strconv.Itoa(i) + " isn't 40 chars long, it's " + strconv.Itoa(len(v)) + " long.") - } - infohashes[i] = []byte(v) - } - res, err := s.Scrape(infohashes...) - if err != nil { - log.Println(err) - } else { - atomic.AddUint64(counter, uint64(len(infohashes))) - trackerResponses <- trackerResponse{tracker, res} - } - - time.Sleep(time.Duration(waittime) * time.Millisecond) - } -} - -func runPersister(trackerResponses chan trackerResponse, db *sql.DB, datestamp string) { - for res := range trackerResponses { - for _, scrapeResult := range res.scrapeResult { - _, err := db.Exec("INSERT INTO trackerdata (infohash, tracker, seeders, leechers, completed, scraped) VALUES ($1, $2, $3, $4, $5, $6)", scrapeResult.Infohash, res.tracker, scrapeResult.Seeders, scrapeResult.Leechers, scrapeResult.Completed, datestamp) - if err != nil { - log.Fatal(err) - } - } - } -} - -func initDb() *sql.DB { - connStr := "user=nextgen dbname=nextgen host=/var/run/postgresql" - db, err := sql.Open("postgres", connStr) - if err != nil { - log.Fatal(err) - } - - /*_, err = db.Exec(`CREATE TYPE tracker AS ENUM ('udp://tracker.coppersurfer.tk:6969', 'udp://exodus.desync.com:6969', 'udp://tracker.pirateparty.gr:6969')`) - if err != nil { - log.Fatal(err) - }*/ - - _, err = db.Exec(`CREATE TABLE IF NOT EXISTS trackerdata ( - infohash char(40), - tracker tracker, - seeders int NOT NULL, - leechers int NOT NULL, - completed int NOT NULL, - scraped char(10), - PRIMARY KEY (infohash, scraped, tracker) - )`) - if err != nil { - log.Fatal(err) - } - return db -} - -type trackerResponse struct { - tracker string - scrapeResult []*goscrape.ScrapeResult -} diff --git a/update-index.sh b/update-index.sh index e3091eb..9fc2c69 100755 --- a/update-index.sh +++ b/update-index.sh @@ -1,19 +1,15 @@ # This script updates the index. Testing and uploading to server/IPFS is done manually. -echo "Scraping trackers for seed/leech data" -mosh nextgen@server "~/tracker-scraper" # you can use ssh instead of mosh aswell - +echo "Refreshing database" ssh nextgen@server "psql -c 'REFRESH MATERIALIZED VIEW fresh'" - -echo "Generating index dump" +echo "Downloading dump" rm index-generator/dump.csv -ssh nextgen@server "psql -c '\copy (select * from fresh) to stdout with (format csv)'" > index-generator/dump.csv - +ssh nextgen@server "psql -c '\copy (select fresh.infohash, torrent.name, torrent.length, fresh.s, fresh.l, fresh.c from fresh inner join torrent on torrent.infohash = fresh.infohash) to stdout with (format csv)'" > index-generator/dump.csv +echo "Generating index" (cd index-generator; node --max-old-space-size=10000 main.js) python3 index-generator/fix-metajson.py website/generated/inx - +echo "Generating top torrents list" generate-top-torrents/generate-top-torrents > website/generated/top.json - echo "Uploading website" cd website rsync -ar ./ root@server:/www/torrent-paradise.ml # consider using --progress \ No newline at end of file