search_cli.rb 3.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108
  1. # frozen_string_literal: true
  2. require_relative '../../config/boot'
  3. require_relative '../../config/environment'
  4. require_relative 'cli_helper'
  5. module Mastodon
  6. class SearchCLI < Thor
  7. include CLIHelper
  8. # Indices are sorted by amount of data to be expected in each, so that
  9. # smaller indices can go online sooner
  10. INDICES = [
  11. AccountsIndex,
  12. TagsIndex,
  13. StatusesIndex,
  14. ].freeze
  15. option :concurrency, type: :numeric, default: 5, aliases: [:c], desc: 'Workload will be split between this number of threads'
  16. option :batch_size, type: :numeric, default: 100, aliases: [:b], desc: 'Number of records in each batch'
  17. option :only, type: :array, enum: %w(accounts tags statuses), desc: 'Only process these indices'
  18. option :import, type: :boolean, default: true, desc: 'Import data from the database to the index'
  19. option :clean, type: :boolean, default: true, desc: 'Remove outdated documents from the index'
  20. desc 'deploy', 'Create or upgrade Elasticsearch indices and populate them'
  21. long_desc <<~LONG_DESC
  22. If Elasticsearch is empty, this command will create the necessary indices
  23. and then import data from the database into those indices.
  24. This command will also upgrade indices if the underlying schema has been
  25. changed since the last run. Index upgrades erase index data.
  26. Even if creating or upgrading indices is not necessary, data from the
  27. database will be imported into the indices, unless overridden with --no-import.
  28. LONG_DESC
  29. def deploy
  30. if options[:concurrency] < 1
  31. say('Cannot run with this concurrency setting, must be at least 1', :red)
  32. exit(1)
  33. end
  34. if options[:batch_size] < 1
  35. say('Cannot run with this batch_size setting, must be at least 1', :red)
  36. exit(1)
  37. end
  38. indices = begin
  39. if options[:only]
  40. options[:only].map { |str| "#{str.camelize}Index".constantize }
  41. else
  42. INDICES
  43. end
  44. end
  45. pool = Concurrent::FixedThreadPool.new(options[:concurrency], max_queue: options[:concurrency] * 10)
  46. importers = indices.index_with { |index| "Importer::#{index.name}Importer".constantize.new(batch_size: options[:batch_size], executor: pool) }
  47. progress = ProgressBar.create(total: nil, format: '%t%c/%u |%b%i| %e (%r docs/s)', autofinish: false)
  48. # First, ensure all indices are created and have the correct
  49. # structure, so that live data can already be written
  50. indices.select { |index| index.specification.changed? }.each do |index|
  51. progress.title = "Upgrading #{index} "
  52. index.purge
  53. index.specification.lock!
  54. end
  55. progress.title = 'Estimating workload '
  56. progress.total = indices.sum { |index| importers[index].estimate! }
  57. reset_connection_pools!
  58. added = 0
  59. removed = 0
  60. indices.each do |index|
  61. importer = importers[index]
  62. importer.optimize_for_import!
  63. importer.on_progress do |(indexed, deleted)|
  64. progress.total = nil if progress.progress + indexed + deleted > progress.total
  65. progress.progress += indexed + deleted
  66. added += indexed
  67. removed += deleted
  68. end
  69. importer.on_failure do |reason|
  70. progress.log(pastel.red("Error while importing #{index}: #{reason}"))
  71. end
  72. if options[:import]
  73. progress.title = "Importing #{index} "
  74. importer.import!
  75. end
  76. if options[:clean]
  77. progress.title = "Cleaning #{index} "
  78. importer.clean_up!
  79. end
  80. ensure
  81. importer.optimize_for_search!
  82. end
  83. progress.title = 'Done! '
  84. progress.finish
  85. say("Indexed #{added} records, de-indexed #{removed}", :green, true)
  86. end
  87. end
  88. end