notify.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114
  1. package rss
  2. import (
  3. "context"
  4. "net/http"
  5. "sync"
  6. "time"
  7. log "github.com/go-pkgz/lgr"
  8. "github.com/mmcdole/gofeed"
  9. "github.com/pkg/errors"
  10. )
  11. // Notify on RSS change
  12. type Notify struct {
  13. Feed string
  14. Duration time.Duration
  15. Timeout time.Duration
  16. once sync.Once
  17. ctx context.Context
  18. cancel context.CancelFunc
  19. }
  20. // Event from RSS
  21. type Event struct {
  22. ChanTitle string
  23. Title string
  24. Link string
  25. Text string
  26. guid string
  27. }
  28. // Go starts notifier and returns events channel
  29. func (n *Notify) Go(ctx context.Context) <-chan Event {
  30. log.Printf("[INFO] start notifier for %s, every %s", n.Feed, n.Duration)
  31. n.once.Do(func() { n.ctx, n.cancel = context.WithCancel(ctx) })
  32. ch := make(chan Event)
  33. // wait for duration, can be terminated by ctx
  34. waitOrCancel := func(ctx context.Context) bool {
  35. select {
  36. case <-ctx.Done():
  37. return false
  38. case <-time.After(n.Duration):
  39. return true
  40. }
  41. }
  42. go func() {
  43. defer func() {
  44. close(ch)
  45. n.cancel()
  46. }()
  47. fp := gofeed.NewParser()
  48. fp.Client = &http.Client{Timeout: n.Timeout}
  49. log.Printf("[DEBUG] notifier uses http timeout %v", n.Timeout)
  50. lastGUID := ""
  51. for {
  52. feedData, err := fp.ParseURL(n.Feed)
  53. if err != nil {
  54. log.Printf("[WARN] failed to fetch/parse url from %s, %v", n.Feed, err)
  55. if !waitOrCancel(n.ctx) {
  56. return
  57. }
  58. continue
  59. }
  60. event, err := n.feedEvent(feedData)
  61. if lastGUID != event.guid && err == nil {
  62. if lastGUID != "xyz" { // don't notify on initial change
  63. log.Printf("[INFO] new event %s - %s", event.guid, event.Title)
  64. ch <- event
  65. } else {
  66. log.Printf("[INFO] ignore first event %s - %s", event.guid, event.Title)
  67. }
  68. lastGUID = event.guid
  69. }
  70. if !waitOrCancel(n.ctx) {
  71. log.Print("[WARN] notifier canceled")
  72. return
  73. }
  74. }
  75. }()
  76. return ch
  77. }
  78. // Shutdown notifier
  79. func (n *Notify) Shutdown() {
  80. log.Print("[DEBUG] shutdown initiated")
  81. n.cancel()
  82. <-n.ctx.Done()
  83. }
  84. // feedEvent gets latest item from rss feed
  85. func (n *Notify) feedEvent(feed *gofeed.Feed) (e Event, err error) {
  86. if len(feed.Items) == 0 {
  87. return e, errors.New("no items in rss feed")
  88. }
  89. if feed.Items[0].GUID == "" {
  90. return e, errors.Errorf("no guid for rss entry %+v", feed.Items[0])
  91. }
  92. e.ChanTitle = feed.Title
  93. e.Title = feed.Items[0].Title
  94. e.Link = feed.Items[0].Link
  95. e.Text = feed.Items[0].Description
  96. e.guid = feed.Items[0].GUID
  97. return e, nil
  98. }