You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
379 lines
7.6 KiB
379 lines
7.6 KiB
package main |
|
|
|
import ( |
|
"git.sp4ke.com/sp4ke/hugobot/v3/export" |
|
"git.sp4ke.com/sp4ke/hugobot/v3/feeds" |
|
"git.sp4ke.com/sp4ke/hugobot/v3/static" |
|
"git.sp4ke.com/sp4ke/hugobot/v3/utils" |
|
"errors" |
|
"log" |
|
"math" |
|
"path" |
|
"time" |
|
|
|
gum "git.sp4ke.com/sp4ke/gum.git" |
|
"github.com/syndtr/goleveldb/leveldb" |
|
|
|
"github.com/beeker1121/goque" |
|
) |
|
|
|
const ( |
|
StaticDataExportInterval = 3600 * time.Second |
|
JobSchedulerInterval = 60 * time.Second |
|
QDataDir = "./.data" |
|
MaxQueueJobs = 100 |
|
MaxSerialJob = 100 |
|
|
|
// Used in JobPool to avoid duplicate jobs for the same feed |
|
MapFeedJobFile = "map_feed_job" |
|
) |
|
|
|
var ( |
|
SourcePriorityRange = Range{Min: 0, Max: 336 * time.Hour.Seconds()} // 2 weeks |
|
TargetPriorityRange = Range{Min: 0, Max: math.MaxUint8} |
|
|
|
SchedulerUpdates = make(chan *Job) |
|
) |
|
|
|
type Range struct { |
|
Min float64 |
|
Max float64 |
|
} |
|
|
|
func (r Range) Val() float64 { |
|
return r.Max - r.Min |
|
} |
|
|
|
// JobPool schedluer. Priodically schedule new jobs |
|
type Scheduler struct { |
|
jobs *JobPool |
|
jobUpdates chan *Job |
|
serialJobs chan *Job |
|
} |
|
|
|
func serialRun(inputJobs <-chan *Job) { |
|
for { |
|
j := <-inputJobs |
|
log.Printf("serial run %v", j) |
|
j.Handle() |
|
} |
|
} |
|
|
|
func (s *Scheduler) Run(m gum.UnitManager) { |
|
go serialRun(s.serialJobs) // These jobs run in series |
|
|
|
jobTimer := time.NewTicker(JobSchedulerInterval) |
|
staticExportTimer := time.NewTicker(StaticDataExportInterval) |
|
|
|
for { |
|
select { |
|
case <-jobTimer.C: |
|
log.Println("job heartbeat !") |
|
|
|
j, _ := s.jobs.Peek() |
|
if j != nil { |
|
log.Printf("peeking job: %s\n", j) |
|
} |
|
|
|
// If max pool jobs reached clean the pool |
|
if s.jobs.Length() >= MaxQueueJobs { |
|
s.jobs.Drop() |
|
s.panicAndShutdown(m) |
|
return |
|
} |
|
|
|
s.updateJobs() |
|
|
|
// Spawn job works |
|
s.dispatchJobs() |
|
|
|
case job := <-s.jobUpdates: |
|
log.Printf("job update recieved: %s", JobStatusMap[job.Status]) |
|
|
|
switch job.Status { |
|
|
|
case JobStatusDone: |
|
log.Println("Job is done, removing from feedJobMap. New jobs for this feed can be added now.") |
|
|
|
// Remove job from feedJobMap |
|
err := s.jobs.DeleteMarkedJob(job) |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
// Create export job for successful fetch jobs |
|
if job.JobType == JobTypeFetch { |
|
log.Println("Creating an export job") |
|
expJob, err := NewExportJob(job.Feed, 0) |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
//log.Printf("export job: %+v", expJob) |
|
log.Printf("export job: %s\n", expJob) |
|
|
|
err = s.jobs.Enqueue(expJob) |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
} |
|
|
|
case JobStatusFailed: |
|
//TODO: Store all failed jobs somewhere |
|
err := s.jobs.DeleteMarkedJob(job) |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
log.Printf("Job %s failed with error: %s", job.Feed.Name, job.Err) |
|
} |
|
|
|
case <-staticExportTimer.C: |
|
log.Println("-------- export tick --------") |
|
|
|
log.Println("Exporting static data ...") |
|
err := static.HugoExportData() |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
log.Println("Exporting weeks ...") |
|
err = export.ExportWeeks() |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
log.Println("Exporting btc ...") |
|
err = export.ExportBTCAddresses() |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
case <-m.ShouldStop(): |
|
s.Shutdown() |
|
m.Done() |
|
|
|
} |
|
} |
|
} |
|
|
|
func (s *Scheduler) panicAndShutdown(m gum.UnitManager) { |
|
|
|
//err := s.jobs.Drop() |
|
//if err != nil { |
|
//log.Fatal(err) |
|
//} |
|
//TODO |
|
m.Panic(errors.New("max job queue exceeded")) |
|
|
|
s.Shutdown() |
|
|
|
m.Done() |
|
} |
|
|
|
func (s *Scheduler) Shutdown() { |
|
// Flush ongoing jobs back to job queue |
|
|
|
iter := s.jobs.feedJobMap.NewIterator(nil, nil) |
|
var markedDelete [][]byte |
|
for iter.Next() { |
|
key := iter.Key() |
|
log.Printf("Putting job %s back to queue", key) |
|
value := iter.Value() |
|
//log.Println("value ", value) |
|
job, err := JobFromBytes(value) |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
err = s.jobs.Enqueue(job) |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
markedDelete = append(markedDelete, key) |
|
|
|
} |
|
iter.Release() |
|
err := iter.Error() |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
for _, k := range markedDelete { |
|
|
|
err := s.jobs.feedJobMap.Delete(k, nil) |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
} |
|
|
|
// Close jobpool queue |
|
err = s.jobs.Close() |
|
if err != nil { |
|
panic(err) |
|
} |
|
|
|
} |
|
|
|
// Dispatchs all jobs in the job pool to task workers |
|
func (s *Scheduler) dispatchJobs() { |
|
//log.Println("dispatching ...") |
|
jobsLength := int(s.jobs.Length()) |
|
for i := 0; i < jobsLength; i++ { |
|
log.Printf("Dequeing %d/%d", i, s.jobs.Length()) |
|
|
|
j, err := s.jobs.Dequeue() |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
log.Println("Dispatching ", j) |
|
|
|
if j.Serial { |
|
s.serialJobs <- j |
|
} else { |
|
go j.Handle() |
|
} |
|
} |
|
|
|
} |
|
|
|
// Gets all available feeds and creates |
|
// a new Job if time.Now() - feed.last_refresh >= feed.interval |
|
func (s *Scheduler) updateJobs() { |
|
// |
|
// Get all feeds |
|
// |
|
// For each feed compare Now() vs last refresh |
|
// If now() - last_refresh >= refresh interval -> create job |
|
|
|
feeds, err := feeds.ListFeeds() |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
//log.Printf("updating jobs for %d feeds\n", len(feeds)) |
|
|
|
// Check all jobs |
|
for _, f := range feeds { |
|
//log.Printf("checking feed %s: %s\n", f.Name, f.Url) |
|
//log.Printf("Seconds since last refresh %f", time.Since(f.LastRefresh.Time).Seconds()) |
|
//log.Printf("Refresh interval %f", f.Interval) |
|
|
|
if delta, ok := f.ShouldRefresh(); ok { |
|
log.Printf("Refreshing %s -- %f seconds since last.", f.Name, delta) |
|
|
|
// If there is already a job with this feed id skip and return an empty job |
|
feedId := utils.IntToBytes(f.FeedID) |
|
|
|
_, err := s.jobs.feedJobMap.Get(feedId, nil) |
|
if err == nil { |
|
log.Println("Job already exists for feed") |
|
} else if err != leveldb.ErrNotFound { |
|
log.Fatal(err) |
|
} else { |
|
|
|
// Priority is based on the delta time since last refresh |
|
// bigger delta == higher priority |
|
|
|
// Convert priority to smaller range priority |
|
// We use original range of `0 - 1 month` in seconds |
|
// Target range is uint8 `0 - 255` |
|
prio := MapPriority(delta) |
|
|
|
job, err := NewFetchJob(f, prio) |
|
if err != nil { |
|
panic(err) |
|
} |
|
|
|
err = s.jobs.Enqueue(job) |
|
if err != nil { |
|
panic(err) |
|
} |
|
|
|
err = s.jobs.MarkUniqJob(job) |
|
if err != nil { |
|
panic(err) |
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
jLen := s.jobs.Length() |
|
if jLen > 0 { |
|
log.Printf("jobs length = %+v\n", jLen) |
|
} |
|
} |
|
|
|
func NewScheduler() gum.WorkUnit { |
|
// Priority queue for jobs |
|
q, err := goque.OpenPriorityQueue(QDataDir, goque.DESC) |
|
if err != nil { |
|
panic(err) |
|
} |
|
|
|
// map[FEED_ID][JOB] |
|
// Used to avoid duplicate jobs in the queue for the same feed |
|
feedJobMapDB, err := leveldb.OpenFile(path.Join(QDataDir, MapFeedJobFile), nil) |
|
if err != nil { |
|
panic(err) |
|
} |
|
|
|
jobPool := &JobPool{ |
|
Q: q, |
|
maxJobs: MaxQueueJobs, |
|
feedJobMap: feedJobMapDB, |
|
} |
|
|
|
// Restore all ongoing jobs in feedJobMap to the pool |
|
iter := feedJobMapDB.NewIterator(nil, nil) |
|
var markedDelete [][]byte |
|
for iter.Next() { |
|
key := iter.Key() |
|
value := iter.Value() |
|
|
|
job, err := JobFromBytes(value) |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
log.Printf("Restoring uncomplete job %s back to job queue", job.ID) |
|
|
|
err = jobPool.Enqueue(job) |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
markedDelete = append(markedDelete, key) |
|
} |
|
|
|
iter.Release() |
|
err = iter.Error() |
|
if err != nil { |
|
log.Fatal(err) |
|
} |
|
|
|
serialJobs := make(chan *Job, MaxSerialJob) |
|
|
|
return &Scheduler{ |
|
|
|
jobs: jobPool, |
|
jobUpdates: SchedulerUpdates, |
|
serialJobs: serialJobs, |
|
} |
|
} |
|
|
|
func NotifyScheduler(job *Job) { |
|
SchedulerUpdates <- job |
|
} |
|
|
|
func MapPriority(val float64) uint8 { |
|
newVal := (((val - SourcePriorityRange.Min) * TargetPriorityRange.Val()) / |
|
SourcePriorityRange.Val()) + TargetPriorityRange.Min |
|
|
|
return uint8(newVal) |
|
}
|
|
|