Skip to content
Snippets Groups Projects
Unverified Commit 32a030dd authored by Claire's avatar Claire Committed by GitHub
Browse files

Rewrite import feature (#21054)

parent 0ad2413b
No related branches found
No related tags found
No related merge requests found
Showing
with 664 additions and 69 deletions
......@@ -65,6 +65,7 @@ Metrics/AbcSize:
Metrics/BlockLength:
CountAsOne: ['array', 'hash', 'heredoc', 'method_call']
Exclude:
- 'config/routes.rb'
- 'lib/mastodon/*_cli.rb'
- 'lib/tasks/*.rake'
- 'app/models/concerns/account_associations.rb'
......@@ -130,6 +131,7 @@ Metrics/ClassLength:
- 'app/services/activitypub/process_account_service.rb'
- 'app/services/activitypub/process_status_update_service.rb'
- 'app/services/backup_service.rb'
- 'app/services/bulk_import_service.rb'
- 'app/services/delete_account_service.rb'
- 'app/services/fan_out_on_write_service.rb'
- 'app/services/fetch_link_card_service.rb'
......
......@@ -741,6 +741,7 @@ RSpec/LetSetup:
- 'spec/controllers/following_accounts_controller_spec.rb'
- 'spec/controllers/oauth/authorized_applications_controller_spec.rb'
- 'spec/controllers/oauth/tokens_controller_spec.rb'
- 'spec/controllers/settings/imports_controller_spec.rb'
- 'spec/lib/activitypub/activity/delete_spec.rb'
- 'spec/lib/vacuum/preview_cards_vacuum_spec.rb'
- 'spec/models/account_spec.rb'
......@@ -755,6 +756,7 @@ RSpec/LetSetup:
- 'spec/services/activitypub/process_collection_service_spec.rb'
- 'spec/services/batched_remove_status_service_spec.rb'
- 'spec/services/block_domain_service_spec.rb'
- 'spec/services/bulk_import_service_spec.rb'
- 'spec/services/delete_account_service_spec.rb'
- 'spec/services/import_service_spec.rb'
- 'spec/services/notify_service_spec.rb'
......
# frozen_string_literal: true
require 'csv'
class Settings::ImportsController < Settings::BaseController
before_action :set_account
before_action :set_bulk_import, only: [:show, :confirm, :destroy]
before_action :set_recent_imports, only: [:index]
TYPE_TO_FILENAME_MAP = {
following: 'following_accounts_failures.csv',
blocking: 'blocked_accounts_failures.csv',
muting: 'muted_accounts_failures.csv',
domain_blocking: 'blocked_domains_failures.csv',
bookmarks: 'bookmarks_failures.csv',
}.freeze
TYPE_TO_HEADERS_MAP = {
following: ['Account address', 'Show boosts', 'Notify on new posts', 'Languages'],
blocking: false,
muting: ['Account address', 'Hide notifications'],
domain_blocking: false,
bookmarks: false,
}.freeze
def index
@import = Form::Import.new(current_account: current_account)
end
def show; end
def failures
@bulk_import = current_account.bulk_imports.where(state: :finished).find(params[:id])
respond_to do |format|
format.csv do
filename = TYPE_TO_FILENAME_MAP[@bulk_import.type.to_sym]
headers = TYPE_TO_HEADERS_MAP[@bulk_import.type.to_sym]
export_data = CSV.generate(headers: headers, write_headers: true) do |csv|
@bulk_import.rows.find_each do |row|
case @bulk_import.type.to_sym
when :following
csv << [row.data['acct'], row.data.fetch('show_reblogs', true), row.data.fetch('notify', false), row.data['languages']&.join(', ')]
when :blocking
csv << [row.data['acct']]
when :muting
csv << [row.data['acct'], row.data.fetch('hide_notifications', true)]
when :domain_blocking
csv << [row.data['domain']]
when :bookmarks
csv << [row.data['uri']]
end
end
end
def show
@import = Import.new
send_data export_data, filename: filename
end
end
end
def confirm
@bulk_import.update!(state: :scheduled)
BulkImportWorker.perform_async(@bulk_import.id)
redirect_to settings_imports_path, notice: I18n.t('imports.success')
end
def create
@import = Import.new(import_params)
@import.account = @account
@import = Form::Import.new(import_params.merge(current_account: current_account))
if @import.save
ImportWorker.perform_async(@import.id)
redirect_to settings_import_path, notice: I18n.t('imports.success')
redirect_to settings_import_path(@import.bulk_import.id)
else
render :show
# We need to set recent imports as we are displaying the index again
set_recent_imports
render :index
end
end
def destroy
@bulk_import.destroy!
redirect_to settings_imports_path
end
private
def set_account
@account = current_user.account
def import_params
params.require(:form_import).permit(:data, :type, :mode)
end
def import_params
params.require(:import).permit(:data, :type, :mode)
def set_bulk_import
@bulk_import = current_account.bulk_imports.where(state: :unconfirmed).find(params[:id])
end
def set_recent_imports
@recent_imports = current_account.bulk_imports.reorder(id: :desc).limit(10)
end
end
# frozen_string_literal: true
class Vacuum::ImportsVacuum
def perform
clean_unconfirmed_imports!
clean_old_imports!
end
private
def clean_unconfirmed_imports!
BulkImport.where(state: :unconfirmed).where('created_at <= ?', 10.minutes.ago).reorder(nil).in_batches.delete_all
end
def clean_old_imports!
BulkImport.where('created_at <= ?', 1.week.ago).reorder(nil).in_batches.delete_all
end
end
# frozen_string_literal: true
# == Schema Information
#
# Table name: bulk_imports
#
# id :bigint(8) not null, primary key
# type :integer not null
# state :integer not null
# total_items :integer default(0), not null
# imported_items :integer default(0), not null
# processed_items :integer default(0), not null
# finished_at :datetime
# overwrite :boolean default(FALSE), not null
# likely_mismatched :boolean default(FALSE), not null
# original_filename :string default(""), not null
# account_id :bigint(8) not null
# created_at :datetime not null
# updated_at :datetime not null
#
class BulkImport < ApplicationRecord
self.inheritance_column = false
belongs_to :account
has_many :rows, class_name: 'BulkImportRow', inverse_of: :bulk_import, dependent: :delete_all
enum type: {
following: 0,
blocking: 1,
muting: 2,
domain_blocking: 3,
bookmarks: 4,
}
enum state: {
unconfirmed: 0,
scheduled: 1,
in_progress: 2,
finished: 3,
}
validates :type, presence: true
def self.progress!(bulk_import_id, imported: false)
# Use `increment_counter` so that the incrementation is done atomically in the database
BulkImport.increment_counter(:processed_items, bulk_import_id) # rubocop:disable Rails/SkipsModelValidations
BulkImport.increment_counter(:imported_items, bulk_import_id) if imported # rubocop:disable Rails/SkipsModelValidations
# Since the incrementation has been done atomically, concurrent access to `bulk_import` is now bening
bulk_import = BulkImport.find(bulk_import_id)
bulk_import.update!(state: :finished, finished_at: Time.now.utc) if bulk_import.processed_items == bulk_import.total_items
end
end
# frozen_string_literal: true
# == Schema Information
#
# Table name: bulk_import_rows
#
# id :bigint(8) not null, primary key
# bulk_import_id :bigint(8) not null
# data :jsonb
# created_at :datetime not null
# updated_at :datetime not null
#
class BulkImportRow < ApplicationRecord
belongs_to :bulk_import
end
......@@ -68,5 +68,8 @@ module AccountAssociations
# Account statuses cleanup policy
has_one :statuses_cleanup_policy, class_name: 'AccountStatusesCleanupPolicy', inverse_of: :account, dependent: :destroy
# Imports
has_many :bulk_imports, inverse_of: :account, dependent: :delete_all
end
end
# frozen_string_literal: true
require 'csv'
# A non-ActiveRecord helper class for CSV uploads.
# Handles saving contents to database.
class Form::Import
include ActiveModel::Model
MODES = %i(merge overwrite).freeze
FILE_SIZE_LIMIT = 20.megabytes
ROWS_PROCESSING_LIMIT = 20_000
EXPECTED_HEADERS_BY_TYPE = {
following: ['Account address', 'Show boosts', 'Notify on new posts', 'Languages'],
blocking: ['Account address'],
muting: ['Account address', 'Hide notifications'],
domain_blocking: ['#domain'],
bookmarks: ['#uri'],
}.freeze
KNOWN_FIRST_HEADERS = EXPECTED_HEADERS_BY_TYPE.values.map(&:first).uniq.freeze
ATTRIBUTE_BY_HEADER = {
'Account address' => 'acct',
'Show boosts' => 'show_reblogs',
'Notify on new posts' => 'notify',
'Languages' => 'languages',
'Hide notifications' => 'hide_notifications',
'#domain' => 'domain',
'#uri' => 'uri',
}.freeze
class EmptyFileError < StandardError; end
attr_accessor :current_account, :data, :type, :overwrite, :bulk_import
validates :type, presence: true
validates :data, presence: true
validate :validate_data
def guessed_type
return :muting if csv_data.headers.include?('Hide notifications')
return :following if csv_data.headers.include?('Show boosts') || csv_data.headers.include?('Notify on new posts') || csv_data.headers.include?('Languages')
return :following if data.original_filename&.start_with?('follows') || data.original_filename&.start_with?('following_accounts')
return :blocking if data.original_filename&.start_with?('blocks') || data.original_filename&.start_with?('blocked_accounts')
return :muting if data.original_filename&.start_with?('mutes') || data.original_filename&.start_with?('muted_accounts')
return :domain_blocking if data.original_filename&.start_with?('domain_blocks') || data.original_filename&.start_with?('blocked_domains')
return :bookmarks if data.original_filename&.start_with?('bookmarks')
end
# Whether the uploaded CSV file seems to correspond to a different import type than the one selected
def likely_mismatched?
guessed_type.present? && guessed_type != type.to_sym
end
def save
return false unless valid?
ApplicationRecord.transaction do
now = Time.now.utc
@bulk_import = current_account.bulk_imports.create(type: type, overwrite: overwrite || false, state: :unconfirmed, original_filename: data.original_filename, likely_mismatched: likely_mismatched?)
nb_items = BulkImportRow.insert_all(parsed_rows.map { |row| { bulk_import_id: bulk_import.id, data: row, created_at: now, updated_at: now } }).length # rubocop:disable Rails/SkipsModelValidations
@bulk_import.update(total_items: nb_items)
end
end
def mode
overwrite ? :overwrite : :merge
end
def mode=(str)
self.overwrite = str.to_sym == :overwrite
end
private
def default_csv_header
case type.to_sym
when :following, :blocking, :muting
'Account address'
when :domain_blocking
'#domain'
when :bookmarks
'#uri'
end
end
def csv_data
return @csv_data if defined?(@csv_data)
csv_converter = lambda do |field, field_info|
case field_info.header
when 'Show boosts', 'Notify on new posts', 'Hide notifications'
ActiveModel::Type::Boolean.new.cast(field)
when 'Languages'
field&.split(',')&.map(&:strip)&.presence
when 'Account address'
field.strip.gsub(/\A@/, '')
when '#domain', '#uri'
field.strip
else
field
end
end
@csv_data = CSV.open(data.path, encoding: 'UTF-8', skip_blanks: true, headers: true, converters: csv_converter)
@csv_data.take(1) # Ensure the headers are read
raise EmptyFileError if @csv_data.headers == true
@csv_data = CSV.open(data.path, encoding: 'UTF-8', skip_blanks: true, headers: [default_csv_header], converters: csv_converter) unless KNOWN_FIRST_HEADERS.include?(@csv_data.headers&.first)
@csv_data
end
def csv_row_count
return @csv_row_count if defined?(@csv_row_count)
csv_data.rewind
@csv_row_count = csv_data.take(ROWS_PROCESSING_LIMIT + 2).count
end
def parsed_rows
csv_data.rewind
expected_headers = EXPECTED_HEADERS_BY_TYPE[type.to_sym]
csv_data.take(ROWS_PROCESSING_LIMIT + 1).map do |row|
row.to_h.slice(*expected_headers).transform_keys { |key| ATTRIBUTE_BY_HEADER[key] }
end
end
def validate_data
return if data.nil?
return errors.add(:data, I18n.t('imports.errors.too_large')) if data.size > FILE_SIZE_LIMIT
return errors.add(:data, I18n.t('imports.errors.incompatible_type')) unless csv_data.headers.include?(default_csv_header)
errors.add(:data, I18n.t('imports.errors.over_rows_processing_limit', count: ROWS_PROCESSING_LIMIT)) if csv_row_count > ROWS_PROCESSING_LIMIT
if type.to_sym == :following
base_limit = FollowLimitValidator.limit_for_account(current_account)
limit = base_limit
limit -= current_account.following_count unless overwrite
errors.add(:data, I18n.t('users.follow_limit_reached', limit: base_limit)) if csv_row_count > limit
end
rescue CSV::MalformedCSVError => e
errors.add(:data, I18n.t('imports.errors.invalid_csv_file', error: e.message))
rescue EmptyFileError
errors.add(:data, I18n.t('imports.errors.empty'))
end
end
......@@ -17,6 +17,9 @@
# overwrite :boolean default(FALSE), not null
#
# NOTE: This is a deprecated model, only kept to not break ongoing imports
# on upgrade. See `BulkImport` and `Form::Import` for its replacements.
class Import < ApplicationRecord
FILE_TYPES = %w(text/plain text/csv application/csv).freeze
MODES = %i(merge overwrite).freeze
......@@ -28,7 +31,6 @@ class Import < ApplicationRecord
enum type: { following: 0, blocking: 1, muting: 2, domain_blocking: 3, bookmarks: 4 }
validates :type, presence: true
validates_with ImportValidator, on: :create
has_attached_file :data
validates_attachment_content_type :data, content_type: FILE_TYPES
......
# frozen_string_literal: true
class BulkImportRowService
def call(row)
@account = row.bulk_import.account
@data = row.data
@type = row.bulk_import.type.to_sym
case @type
when :following, :blocking, :muting
target_acct = @data['acct']
target_domain = domain(target_acct)
@target_account = stoplight_wrap_request(target_domain) { ResolveAccountService.new.call(target_acct, { check_delivery_availability: true }) }
return false if @target_account.nil?
when :bookmarks
target_uri = @data['uri']
target_domain = Addressable::URI.parse(target_uri).normalized_host
@target_status = ActivityPub::TagManager.instance.uri_to_resource(target_uri, Status)
return false if @target_status.nil? && ActivityPub::TagManager.instance.local_uri?(target_uri)
@target_status ||= stoplight_wrap_request(target_domain) { ActivityPub::FetchRemoteStatusService.new.call(target_uri) }
return false if @target_status.nil?
end
case @type
when :following
FollowService.new.call(@account, @target_account, reblogs: @data['show_reblogs'], notify: @data['notify'], languages: @data['languages'])
when :blocking
BlockService.new.call(@account, @target_account)
when :muting
MuteService.new.call(@account, @target_account, notifications: @data['hide_notifications'])
when :bookmarks
return false unless StatusPolicy.new(@account, @target_status).show?
@account.bookmarks.find_or_create_by!(status: @target_status)
end
true
rescue ActiveRecord::RecordNotFound
false
end
def domain(uri)
domain = uri.is_a?(Account) ? uri.domain : uri.split('@')[1]
TagManager.instance.local_domain?(domain) ? nil : TagManager.instance.normalize_domain(domain)
end
def stoplight_wrap_request(domain, &block)
if domain.present?
Stoplight("source:#{domain}", &block)
.with_fallback { nil }
.with_threshold(1)
.with_cool_off_time(5.minutes.seconds)
.with_error_handler { |error, handle| error.is_a?(HTTP::Error) || error.is_a?(OpenSSL::SSL::SSLError) ? handle.call(error) : raise(error) }
.run
else
yield
end
end
end
# frozen_string_literal: true
class BulkImportService < BaseService
def call(import)
@import = import
@account = @import.account
case @import.type.to_sym
when :following
import_follows!
when :blocking
import_blocks!
when :muting
import_mutes!
when :domain_blocking
import_domain_blocks!
when :bookmarks
import_bookmarks!
end
@import.update!(state: :finished, finished_at: Time.now.utc) if @import.processed_items == @import.total_items
rescue
@import.update!(state: :finished, finished_at: Time.now.utc)
raise
end
private
def extract_rows_by_acct
local_domain_suffix = "@#{Rails.configuration.x.local_domain}"
@import.rows.to_a.index_by { |row| row.data['acct'].delete_suffix(local_domain_suffix) }
end
def import_follows!
rows_by_acct = extract_rows_by_acct
if @import.overwrite?
@account.following.find_each do |followee|
row = rows_by_acct.delete(followee.acct)
if row.nil?
UnfollowService.new.call(@account, followee)
else
row.destroy
@import.processed_items += 1
@import.imported_items += 1
# Since we're updating the settings of an existing relationship, we can safely call
# FollowService directly
FollowService.new.call(@account, followee, reblogs: row.data['show_reblogs'], notify: row.data['notify'], languages: row.data['languages'])
end
end
# Save pending infos due to `overwrite?` handling
@import.save!
end
Import::RowWorker.push_bulk(rows_by_acct.values) do |row|
[row.id]
end
end
def import_blocks!
rows_by_acct = extract_rows_by_acct
if @import.overwrite?
@account.blocking.find_each do |blocked_account|
row = rows_by_acct.delete(blocked_account.acct)
if row.nil?
UnblockService.new.call(@account, blocked_account)
else
row.destroy
@import.processed_items += 1
@import.imported_items += 1
BlockService.new.call(@account, blocked_account)
end
end
# Save pending infos due to `overwrite?` handling
@import.save!
end
Import::RowWorker.push_bulk(rows_by_acct.values) do |row|
[row.id]
end
end
def import_mutes!
rows_by_acct = extract_rows_by_acct
if @import.overwrite?
@account.muting.find_each do |muted_account|
row = rows_by_acct.delete(muted_account.acct)
if row.nil?
UnmuteService.new.call(@account, muted_account)
else
row.destroy
@import.processed_items += 1
@import.imported_items += 1
MuteService.new.call(@account, muted_account, notifications: row.data['hide_notifications'])
end
end
# Save pending infos due to `overwrite?` handling
@import.save!
end
Import::RowWorker.push_bulk(rows_by_acct.values) do |row|
[row.id]
end
end
def import_domain_blocks!
domains = @import.rows.map { |row| row.data['domain'] }
if @import.overwrite?
@account.domain_blocks.find_each do |domain_block|
domain = domains.delete(domain_block)
@account.unblock_domain!(domain_block.domain) if domain.nil?
end
end
@import.rows.delete_all
domains.each { |domain| @account.block_domain!(domain) }
@import.update!(processed_items: @import.total_items, imported_items: @import.total_items)
AfterAccountDomainBlockWorker.push_bulk(domains) do |domain|
[@account.id, domain]
end
end
def import_bookmarks!
rows_by_uri = @import.rows.index_by { |row| row.data['uri'] }
if @import.overwrite?
@account.bookmarks.includes(:status).find_each do |bookmark|
row = rows_by_uri.delete(ActivityPub::TagManager.instance.uri_for(bookmark.status))
if row.nil?
bookmark.destroy!
else
row.destroy
@import.processed_items += 1
@import.imported_items += 1
end
end
# Save pending infos due to `overwrite?` handling
@import.save!
end
Import::RowWorker.push_bulk(rows_by_uri.values) do |row|
[row.id]
end
end
end
......@@ -2,6 +2,9 @@
require 'csv'
# NOTE: This is a deprecated service, only kept to not break ongoing imports
# on upgrade. See `BulkImportService` for its replacement.
class ImportService < BaseService
ROWS_PROCESSING_LIMIT = 20_000
......
# frozen_string_literal: true
require 'csv'
class ImportValidator < ActiveModel::Validator
KNOWN_HEADERS = [
'Account address',
'#domain',
'#uri',
].freeze
def validate(import)
return if import.type.blank? || import.data.blank?
# We parse because newlines could be part of individual rows. This
# runs on create so we should be reading the local file here before
# it is uploaded to object storage or moved anywhere...
csv_data = CSV.parse(import.data.queued_for_write[:original].read)
row_count = csv_data.size
row_count -= 1 if KNOWN_HEADERS.include?(csv_data.first&.first)
import.errors.add(:data, I18n.t('imports.errors.over_rows_processing_limit', count: ImportService::ROWS_PROCESSING_LIMIT)) if row_count > ImportService::ROWS_PROCESSING_LIMIT
case import.type
when 'following'
validate_following_import(import, row_count)
end
rescue CSV::MalformedCSVError
import.errors.add(:data, :malformed)
end
private
def validate_following_import(import, row_count)
base_limit = FollowLimitValidator.limit_for_account(import.account)
limit = if import.overwrite?
base_limit
else
base_limit - import.account.following_count
end
import.errors.add(:data, I18n.t('users.follow_limit_reached', limit: base_limit)) if row_count > limit
end
end
- content_for :page_title do
= t('settings.import')
= simple_form_for @import, url: settings_imports_path do |f|
.field-group
= f.input :type, as: :grouped_select, collection: { constructive: %i(following bookmarks), destructive: %i(muting blocking domain_blocking) }, wrapper: :with_block_label, include_blank: false, label_method: ->(type) { I18n.t("imports.types.#{type}") }, group_label_method: ->(group) { I18n.t("imports.type_groups.#{group.first}") }, group_method: :last, hint: t('imports.preface')
.fields-row
.fields-group.fields-row__column.fields-row__column-6
= f.input :data, wrapper: :with_block_label, hint: t('simple_form.hints.imports.data')
.fields-group.fields-row__column.fields-row__column-6
= f.input :mode, as: :radio_buttons, collection: Import::MODES, label_method: ->(mode) { safe_join([I18n.t("imports.modes.#{mode}"), content_tag(:span, I18n.t("imports.modes.#{mode}_long"), class: 'hint')]) }, collection_wrapper_tag: 'ul', item_wrapper_tag: 'li'
.actions
= f.button :button, t('imports.upload'), type: :submit
- unless @recent_imports.empty?
%hr.spacer/
%h3= t('imports.recent_imports')
.table-wrapper
%table.table
%thead
%tr
%th= t('imports.type')
%th= t('imports.status')
%th= t('imports.imported')
%th= t('imports.time_started')
%th= t('imports.failures')
%tbody
- @recent_imports.each do |import|
%tr
%td= t("imports.types.#{import.type}")
%td
- if import.unconfirmed?
= link_to t("imports.states.#{import.state}"), settings_import_path(import)
- else
= t("imports.states.#{import.state}")
%td
#{import.imported_items} / #{import.total_items}
%td= l(import.created_at)
%td
- num_failed = import.processed_items - import.imported_items
- if num_failed.positive?
- if import.finished?
= link_to num_failed, failures_settings_import_path(import, format: 'csv')
- else
= num_failed
- content_for :page_title do
= t('settings.import')
= t("imports.titles.#{@bulk_import.type.to_s}")
= simple_form_for @import, url: settings_import_path do |f|
.field-group
= f.input :type, collection: Import.types.keys, wrapper: :with_block_label, include_blank: false, label_method: lambda { |type| I18n.t("imports.types.#{type}") }, hint: t('imports.preface')
- if @bulk_import.likely_mismatched?
.flash-message.warning= t("imports.mismatched_types_warning")
.fields-row
.fields-group.fields-row__column.fields-row__column-6
= f.input :data, wrapper: :with_block_label, hint: t('simple_form.hints.imports.data')
.fields-group.fields-row__column.fields-row__column-6
= f.input :mode, as: :radio_buttons, collection: Import::MODES, label_method: lambda { |mode| safe_join([I18n.t("imports.modes.#{mode}"), content_tag(:span, I18n.t("imports.modes.#{mode}_long"), class: 'hint')]) }, collection_wrapper_tag: 'ul', item_wrapper_tag: 'li'
- if @bulk_import.overwrite?
%p.hint= t("imports.overwrite_preambles.#{@bulk_import.type.to_s}_html", filename: @bulk_import.original_filename, total_items: @bulk_import.total_items)
- else
%p.hint= t("imports.preambles.#{@bulk_import.type.to_s}_html", filename: @bulk_import.original_filename, total_items: @bulk_import.total_items)
.simple_form
.actions
= f.button :button, t('imports.upload'), type: :submit
= link_to t('generic.cancel'), settings_import_path(@bulk_import), method: :delete, class: 'button button-tertiary'
= link_to t('generic.confirm'), confirm_settings_import_path(@bulk_import), method: :post, class: 'button'
# frozen_string_literal: true
class BulkImportWorker
include Sidekiq::Worker
sidekiq_options queue: 'pull', retry: false
def perform(import_id)
import = BulkImport.find(import_id)
import.update!(state: :in_progress)
BulkImportService.new.call(import)
end
end
# frozen_string_literal: true
# NOTE: This is a deprecated worker, only kept to not break ongoing imports
# on upgrade. See `Import::RowWorker` for its replacement.
class Import::RelationshipWorker
include Sidekiq::Worker
......
# frozen_string_literal: true
class Import::RowWorker
include Sidekiq::Worker
sidekiq_options queue: 'pull', retry: 6, dead: false
sidekiq_retries_exhausted do |msg, _exception|
ActiveRecord::Base.connection_pool.with_connection do
# Increment the total number of processed items, and bump the state of the import if needed
bulk_import_id = BulkImportRow.where(id: msg['args'][0]).pick(:id)
BulkImport.progress!(bulk_import_id) unless bulk_import_id.nil?
end
end
def perform(row_id)
row = BulkImportRow.eager_load(bulk_import: :account).find_by(id: row_id)
return true if row.nil?
imported = BulkImportRowService.new.call(row)
mark_as_processed!(row, imported)
end
private
def mark_as_processed!(row, imported)
bulk_import_id = row.bulk_import_id
row.destroy! if imported
BulkImport.progress!(bulk_import_id, imported: imported)
end
end
# frozen_string_literal: true
# NOTE: This is a deprecated worker, only kept to not break ongoing imports
# on upgrade. See `ImportWorker` for its replacement.
class ImportWorker
include Sidekiq::Worker
......
......@@ -23,6 +23,7 @@ class Scheduler::VacuumScheduler
backups_vacuum,
access_tokens_vacuum,
feeds_vacuum,
imports_vacuum,
]
end
......@@ -50,6 +51,10 @@ class Scheduler::VacuumScheduler
Vacuum::FeedsVacuum.new
end
def imports_vacuum
Vacuum::ImportsVacuum.new
end
def content_retention_policy
ContentRetentionPolicy.current
end
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment