cli_helper.rb 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. # frozen_string_literal: true
  2. dev_null = Logger.new('/dev/null')
  3. Rails.logger = dev_null
  4. ActiveRecord::Base.logger = dev_null
  5. ActiveJob::Base.logger = dev_null
  6. HttpLog.configuration.logger = dev_null
  7. Paperclip.options[:log] = false
  8. Chewy.logger = dev_null
  9. module Mastodon
  10. module CLIHelper
  11. def dry_run?
  12. options[:dry_run]
  13. end
  14. def create_progress_bar(total = nil)
  15. ProgressBar.create(total: total, format: '%c/%u |%b%i| %e')
  16. end
  17. def reset_connection_pools!
  18. ActiveRecord::Base.establish_connection(ActiveRecord::Base.configurations[Rails.env].dup.tap { |config| config['pool'] = options[:concurrency] + 1 })
  19. RedisConfiguration.establish_pool(options[:concurrency])
  20. end
  21. def parallelize_with_progress(scope)
  22. if options[:concurrency] < 1
  23. say('Cannot run with this concurrency setting, must be at least 1', :red)
  24. exit(1)
  25. end
  26. reset_connection_pools!
  27. progress = create_progress_bar(scope.count)
  28. pool = Concurrent::FixedThreadPool.new(options[:concurrency])
  29. total = Concurrent::AtomicFixnum.new(0)
  30. aggregate = Concurrent::AtomicFixnum.new(0)
  31. scope.reorder(nil).find_in_batches do |items|
  32. futures = []
  33. items.each do |item|
  34. futures << Concurrent::Future.execute(executor: pool) do
  35. begin
  36. if !progress.total.nil? && progress.progress + 1 > progress.total
  37. # The number of items has changed between start and now,
  38. # since there is no good way to predict the final count from
  39. # here, just change the progress bar to an indeterminate one
  40. progress.total = nil
  41. end
  42. progress.log("Processing #{item.id}") if options[:verbose]
  43. Chewy.strategy(:mastodon) do
  44. result = ActiveRecord::Base.connection_pool.with_connection do
  45. yield(item)
  46. ensure
  47. RedisConfiguration.pool.checkin if Thread.current[:redis]
  48. Thread.current[:redis] = nil
  49. end
  50. aggregate.increment(result) if result.is_a?(Integer)
  51. end
  52. rescue => e
  53. progress.log pastel.red("Error processing #{item.id}: #{e}")
  54. ensure
  55. progress.increment
  56. end
  57. end
  58. end
  59. total.increment(items.size)
  60. futures.map(&:value)
  61. end
  62. progress.stop
  63. [total.value, aggregate.value]
  64. end
  65. def pastel
  66. @pastel ||= Pastel.new
  67. end
  68. end
  69. end