search.rb 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. # frozen_string_literal: true
  2. require_relative 'base'
  3. module Mastodon::CLI
  4. class Search < Base
  5. # Indices are sorted by amount of data to be expected in each, so that
  6. # smaller indices can go online sooner
  7. INDICES = [
  8. InstancesIndex,
  9. AccountsIndex,
  10. TagsIndex,
  11. PublicStatusesIndex,
  12. StatusesIndex,
  13. ].freeze
  14. option :concurrency, type: :numeric, default: 5, aliases: [:c], desc: 'Workload will be split between this number of threads'
  15. option :batch_size, type: :numeric, default: 100, aliases: [:b], desc: 'Number of records in each batch'
  16. option :only, type: :array, enum: %w(instances accounts tags statuses public_statuses), desc: 'Only process these indices'
  17. option :import, type: :boolean, default: true, desc: 'Import data from the database to the index'
  18. option :clean, type: :boolean, default: true, desc: 'Remove outdated documents from the index'
  19. option :reset_chewy, type: :boolean, default: false, desc: "Reset Chewy's internal index"
  20. desc 'deploy', 'Create or upgrade Elasticsearch indices and populate them'
  21. long_desc <<~LONG_DESC
  22. If Elasticsearch is empty, this command will create the necessary indices
  23. and then import data from the database into those indices.
  24. This command will also upgrade indices if the underlying schema has been
  25. changed since the last run. Index upgrades erase index data.
  26. Even if creating or upgrading indices is not necessary, data from the
  27. database will be imported into the indices, unless overridden with --no-import.
  28. LONG_DESC
  29. def deploy
  30. verify_deploy_options!
  31. indices = if options[:only]
  32. options[:only].map { |str| "#{str.camelize}Index".constantize }
  33. else
  34. INDICES
  35. end
  36. pool = Concurrent::FixedThreadPool.new(options[:concurrency], max_queue: options[:concurrency] * 10)
  37. importers = indices.index_with { |index| "Importer::#{index.name}Importer".constantize.new(batch_size: options[:batch_size], executor: pool) }
  38. progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
  39. Chewy::Stash::Specification.reset! if options[:reset_chewy]
  40. # First, ensure all indices are created and have the correct
  41. # structure, so that live data can already be written
  42. indices.select { |index| index.specification.changed? }.each do |index|
  43. progress.title = "Upgrading #{index} "
  44. index.purge
  45. index.specification.lock!
  46. end
  47. progress.title = 'Estimating workload '
  48. progress.total = indices.sum { |index| importers[index].estimate! }
  49. reset_connection_pools!
  50. added = 0
  51. removed = 0
  52. indices.each do |index|
  53. importer = importers[index]
  54. importer.optimize_for_import!
  55. importer.on_progress do |(indexed, deleted)|
  56. progress.total = nil if progress.progress + indexed + deleted > progress.total
  57. progress.progress += indexed + deleted
  58. added += indexed
  59. removed += deleted
  60. end
  61. importer.on_failure do |reason|
  62. progress.log(pastel.red("Error while importing #{index}: #{reason}"))
  63. end
  64. if options[:import]
  65. progress.title = "Importing #{index} "
  66. importer.import!
  67. end
  68. if options[:clean]
  69. progress.title = "Cleaning #{index} "
  70. importer.clean_up!
  71. end
  72. ensure
  73. importer.optimize_for_search!
  74. end
  75. progress.title = 'Done! '
  76. progress.finish
  77. say("Indexed #{added} records, de-indexed #{removed}", :green, true)
  78. end
  79. private
  80. def verify_deploy_options!
  81. verify_deploy_concurrency!
  82. verify_deploy_batch_size!
  83. end
  84. def verify_deploy_concurrency!
  85. return unless options[:concurrency] < 1
  86. say('Cannot run with this concurrency setting, must be at least 1', :red)
  87. exit(1)
  88. end
  89. def verify_deploy_batch_size!
  90. return unless options[:batch_size] < 1
  91. say('Cannot run with this batch_size setting, must be at least 1', :red)
  92. exit(1)
  93. end
  94. end
  95. end