accounts_statuses_cleanup_scheduler.rb 3.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. # frozen_string_literal: true
  2. class Scheduler::AccountsStatusesCleanupScheduler
  3. include Sidekiq::Worker
  4. include Redisable
  5. # This limit is mostly to be nice to the fediverse at large and not
  6. # generate too much traffic.
  7. # This also helps limiting the running time of the scheduler itself.
  8. MAX_BUDGET = 50
  9. # This is an attempt to spread the load across instances, as various
  10. # accounts are likely to have various followers.
  11. PER_ACCOUNT_BUDGET = 5
  12. # This is an attempt to limit the workload generated by status removal
  13. # jobs to something the particular instance can handle.
  14. PER_THREAD_BUDGET = 5
  15. # Those avoid loading an instance that is already under load
  16. MAX_DEFAULT_SIZE = 2
  17. MAX_DEFAULT_LATENCY = 5
  18. MAX_PUSH_SIZE = 5
  19. MAX_PUSH_LATENCY = 10
  20. # 'pull' queue has lower priority jobs, and it's unlikely that pushing
  21. # deletes would cause much issues with this queue if it didn't cause issues
  22. # with default and push. Yet, do not enqueue deletes if the instance is
  23. # lagging behind too much.
  24. MAX_PULL_SIZE = 500
  25. MAX_PULL_LATENCY = 300
  26. # This is less of an issue in general, but deleting old statuses is likely
  27. # to cause delivery errors, and thus increase the number of jobs to be retried.
  28. # This doesn't directly translate to load, but connection errors and a high
  29. # number of dead instances may lead to this spiraling out of control if
  30. # unchecked.
  31. MAX_RETRY_SIZE = 50_000
  32. sidekiq_options retry: 0, lock: :until_executed
  33. def perform
  34. return if under_load?
  35. budget = compute_budget
  36. first_policy_id = last_processed_id
  37. loop do
  38. num_processed_accounts = 0
  39. scope = AccountStatusesCleanupPolicy.where(enabled: true)
  40. scope.where(Account.arel_table[:id].gt(first_policy_id)) if first_policy_id.present?
  41. scope.find_each(order: :asc) do |policy|
  42. num_deleted = AccountStatusesCleanupService.new.call(policy, [budget, PER_ACCOUNT_BUDGET].min)
  43. num_processed_accounts += 1 unless num_deleted.zero?
  44. budget -= num_deleted
  45. if budget.zero?
  46. save_last_processed_id(policy.id)
  47. break
  48. end
  49. end
  50. # The idea here is to loop through all policies at least once until the budget is exhausted
  51. # and start back after the last processed account otherwise
  52. break if budget.zero? || (num_processed_accounts.zero? && first_policy_id.nil?)
  53. first_policy_id = nil
  54. end
  55. end
  56. def compute_budget
  57. threads = Sidekiq::ProcessSet.new.select { |x| x['queues'].include?('push') }.map { |x| x['concurrency'] }.sum
  58. [PER_THREAD_BUDGET * threads, MAX_BUDGET].min
  59. end
  60. def under_load?
  61. return true if Sidekiq::Stats.new.retry_size > MAX_RETRY_SIZE
  62. queue_under_load?('default', MAX_DEFAULT_SIZE, MAX_DEFAULT_LATENCY) || queue_under_load?('push', MAX_PUSH_SIZE, MAX_PUSH_LATENCY) || queue_under_load?('pull', MAX_PULL_SIZE, MAX_PULL_LATENCY)
  63. end
  64. private
  65. def queue_under_load?(name, max_size, max_latency)
  66. queue = Sidekiq::Queue.new(name)
  67. queue.size > max_size || queue.latency > max_latency
  68. end
  69. def last_processed_id
  70. redis.get('account_statuses_cleanup_scheduler:last_account_id')
  71. end
  72. def save_last_processed_id(id)
  73. if id.nil?
  74. redis.del('account_statuses_cleanup_scheduler:last_account_id')
  75. else
  76. redis.set('account_statuses_cleanup_scheduler:last_account_id', id, ex: 1.hour.seconds)
  77. end
  78. end
  79. end