accounts_statuses_cleanup_scheduler.rb 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. # frozen_string_literal: true
  2. class Scheduler::AccountsStatusesCleanupScheduler
  3. include Sidekiq::Worker
  4. include Redisable
  5. # This limit is mostly to be nice to the fediverse at large and not
  6. # generate too much traffic.
  7. # This also helps limiting the running time of the scheduler itself.
  8. MAX_BUDGET = 300
  9. # This is an attempt to spread the load across remote servers, as
  10. # spreading deletions across diverse accounts is likely to spread
  11. # the deletion across diverse followers. It also helps each individual
  12. # user see some effect sooner.
  13. PER_ACCOUNT_BUDGET = 5
  14. # This is an attempt to limit the workload generated by status removal
  15. # jobs to something the particular server can handle.
  16. PER_THREAD_BUDGET = 5
  17. # These are latency limits on various queues above which a server is
  18. # considered to be under load, causing the auto-deletion to be entirely
  19. # skipped for that run.
  20. LOAD_LATENCY_THRESHOLDS = {
  21. default: 5,
  22. push: 10,
  23. # The `pull` queue has lower priority jobs, and it's unlikely that
  24. # pushing deletes would cause much issues with this queue if it didn't
  25. # cause issues with `default` and `push`. Yet, do not enqueue deletes
  26. # if the instance is lagging behind too much.
  27. pull: 5.minutes.to_i,
  28. }.freeze
  29. sidekiq_options retry: 0, lock: :until_executed, lock_ttl: 1.day.to_i
  30. def perform
  31. return if under_load?
  32. budget = compute_budget
  33. # If the budget allows it, we want to consider all accounts with enabled
  34. # auto cleanup at least once.
  35. #
  36. # We start from `first_policy_id` (the last processed id in the previous
  37. # run) and process each policy until we loop to `first_policy_id`,
  38. # recording into `affected_policies` any policy that caused posts to be
  39. # deleted.
  40. #
  41. # After that, we set `full_iteration` to `false` and continue looping on
  42. # policies from `affected_policies`.
  43. first_policy_id = last_processed_id || 0
  44. first_iteration = true
  45. full_iteration = true
  46. affected_policies = []
  47. loop do
  48. num_processed_accounts = 0
  49. scope = cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration)
  50. scope.find_each(order: :asc) do |policy|
  51. num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min)
  52. budget -= num_deleted
  53. unless num_deleted.zero?
  54. num_processed_accounts += 1
  55. affected_policies << policy.id if full_iteration
  56. end
  57. full_iteration = false if !first_iteration && policy.id >= first_policy_id
  58. if budget.zero?
  59. save_last_processed_id(policy.id)
  60. break
  61. end
  62. end
  63. # The idea here is to loop through all policies at least once until the budget is exhausted
  64. # and start back after the last processed account otherwise
  65. break if budget.zero? || (num_processed_accounts.zero? && !full_iteration)
  66. full_iteration = false unless first_iteration
  67. first_iteration = false
  68. end
  69. end
  70. def compute_budget
  71. # Each post deletion is a `RemovalWorker` job (on `default` queue), each
  72. # potentially spawning many `ActivityPub::DeliveryWorker` jobs (on the `push` queue).
  73. threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.pluck('concurrency').sum
  74. [PER_THREAD_BUDGET * threads, MAX_BUDGET].min
  75. end
  76. def under_load?
  77. LOAD_LATENCY_THRESHOLDS.any? { |queue, max_latency| queue_under_load?(queue, max_latency) }
  78. end
  79. private
  80. def cleanup_policies(first_policy_id, affected_policies, first_iteration, full_iteration)
  81. scope = AccountStatusesCleanupPolicy.where(enabled: true)
  82. if full_iteration
  83. # If we are doing a full iteration, examine all policies we have not examined yet
  84. if first_iteration
  85. scope.where(id: first_policy_id...)
  86. else
  87. scope.where(id: ..first_policy_id).or(scope.where(id: affected_policies))
  88. end
  89. else
  90. # Otherwise, examine only policies that previously yielded posts to delete
  91. scope.where(id: affected_policies)
  92. end
  93. end
  94. def queue_under_load?(name, max_latency)
  95. Sidekiq::Queue.new(name).latency > max_latency
  96. end
  97. def last_processed_id
  98. redis.get('account_statuses_cleanup_scheduler:last_policy_id')&.to_i
  99. end
  100. def save_last_processed_id(id)
  101. if id.nil?
  102. redis.del('account_statuses_cleanup_scheduler:last_policy_id')
  103. else
  104. redis.set('account_statuses_cleanup_scheduler:last_policy_id', id, ex: 1.hour.seconds)
  105. end
  106. end
  107. end