package dataset import ( "bytes" "io" "net/http" "net/url" "os" "sync" "time" "git.maze.io/maze/styx/logger" ) type Updater struct { storage Storage lists sync.Map // map[int64]List updaters sync.Map // map[int64]*updaterJob done chan struct{} } func NewUpdater(storage Storage) *Updater { u := &Updater{ storage: storage, done: make(chan struct{}, 1), } go u.refresh() return u } func (u *Updater) Close() error { select { case <-u.done: return nil default: close(u.done) return nil } } func (u *Updater) refresh() { check := time.NewTicker(time.Second) defer check.Stop() var ( log = logger.StandardLog ) for { select { case <-u.done: log.Debug("Updater closing, stopping updaters...") u.updaters.Range(func(key, value any) bool { if value != nil { close(value.(*updaterJob).done) } return true }) return case now := <-check.C: u.check(now, log) } } } func (u *Updater) check(now time.Time, log logger.Structured) (wait time.Duration) { log.Trace("Checking lists") lists, err := u.storage.Lists() if err != nil { log.Err(err).Error("Updater can't retrieve lists") return -1 } var missing = make(map[int64]bool) u.lists.Range(func(key, _ any) bool { log.Tracef("List %d has updater running", key) missing[key.(int64)] = true return true }) for _, list := range lists { log.Tracef("List %d is active: %t", list.ID, list.IsEnabled) if !list.IsEnabled { continue } delete(missing, list.ID) if _, exists := u.lists.Load(list.ID); !exists { u.lists.Store(list.ID, list) updater := newUpdaterJob(u.storage, &list) u.updaters.Store(list.ID, updater) } } for id := range missing { log.Tracef("List %d has updater running, but is no longer active, reaping...", id) if updater, ok := u.updaters.Load(id); ok { close(updater.(*updaterJob).done) u.updaters.Delete(id) } } return } type updaterJob struct { storage Storage list *List done chan struct{} } func newUpdaterJob(storage Storage, list *List) *updaterJob { job := &updaterJob{ storage: storage, list: list, done: make(chan struct{}, 1), } go job.loop() return job } func (job *updaterJob) loop() { var ( ticker = time.NewTicker(job.list.Refresh) first = time.After(0) now time.Time log = logger.StandardLog.Values(logger.Values{ "list": job.list.ID, "type": job.list.Type, }) ) defer ticker.Stop() for { select { case <-job.done: log.Debug("List updater stopping") return case now = <-ticker.C: case now = <-first: } log.Debug("List updater running") if update, err := job.run(now); err != nil { log.Err(err).Error("List updater failed") } else if update { if err = job.storage.SaveList(job.list); err != nil { log.Err(err).Error("List updater save failed") } } } } // run this updater func (job *updaterJob) run(now time.Time) (update bool, err error) { u, err := url.Parse(job.list.Source) if err != nil { return false, err } log := logger.StandardLog.Values(logger.Values{ "list": job.list.ID, "source": job.list.Source, }) if u.Scheme == "" || u.Scheme == "file" { log.Debug("Updating list from file") return job.updateFile(u.Path) } log.Debug("Updating list from URL") return job.updateHTTP(u) } func (job *updaterJob) updateFile(name string) (update bool, err error) { var b []byte if b, err = os.ReadFile(name); err != nil { return } if update = !bytes.Equal(b, job.list.Cache); update { job.list.Cache = b } return } func (job *updaterJob) updateHTTP(location *url.URL) (update bool, err error) { if update, err = job.shouldUpdateHTTP(location); err != nil || !update { return } var ( req *http.Request res *http.Response ) if req, err = http.NewRequest(http.MethodGet, location.String(), nil); err != nil { return } if res, err = http.DefaultClient.Do(req); err != nil { return } defer res.Body.Close() if job.list.Cache, err = io.ReadAll(res.Body); err != nil { return } return true, nil } func (job *updaterJob) shouldUpdateHTTP(location *url.URL) (update bool, err error) { if len(job.list.Cache) == 0 { // Nothing cached, please update. return true, nil } var ( req *http.Request res *http.Response ) if req, err = http.NewRequest(http.MethodHead, location.String(), nil); err != nil { return } if res, err = http.DefaultClient.Do(req); err != nil { return } defer res.Body.Close() if lastModified, err := time.Parse(http.TimeFormat, res.Header.Get("Last-Modified")); err == nil { return lastModified.After(job.list.UpdatedAt), nil } return true, nil // not sure, no Last-Modified, so let's update? }