Skip to content

Commit 4181caa

Browse files
ClearlyClairechrisguida
authored andcommitted
Fix processing of remote Delete activities (mastodon#16084)
* Add tests * Ensure deleted statuses are marked as such * Save some redis memory by not storing URIs in delete_upon_arrival values * Avoid possible race condition when processing incoming Deletes * Avoid potential duplicate Delete forwards * Lower lock durations to reduce issues in case of hard crash of the Rails process * Check for `lock.aquired?` and improve comment * Refactor RedisLock usage in app/lib/activitypub * Fix using incorrect or non-existent sender for relaying Deletes
1 parent ee97ce4 commit 4181caa

6 files changed

Lines changed: 91 additions & 79 deletions

File tree

app/lib/activitypub/activity.rb

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,7 @@ def delete_arrived_first?(uri)
144144
end
145145

146146
def delete_later!(uri)
147-
redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, uri)
147+
redis.setex("delete_upon_arrival:#{@account.id}:#{uri}", 6.hours.seconds, true)
148148
end
149149

150150
def status_from_object
@@ -210,12 +210,22 @@ def fetch_remote_original_status
210210
end
211211
end
212212

213-
def lock_or_return(key, expire_after = 7.days.seconds)
213+
def lock_or_return(key, expire_after = 2.hours.seconds)
214214
yield if redis.set(key, true, nx: true, ex: expire_after)
215215
ensure
216216
redis.del(key)
217217
end
218218

219+
def lock_or_fail(key)
220+
RedisLock.acquire({ redis: Redis.current, key: key }) do |lock|
221+
if lock.acquired?
222+
yield
223+
else
224+
raise Mastodon::RaceConditionError
225+
end
226+
end
227+
end
228+
219229
def fetch?
220230
!@options[:delivery]
221231
end

app/lib/activitypub/activity/announce.rb

Lines changed: 14 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,29 +4,25 @@ class ActivityPub::Activity::Announce < ActivityPub::Activity
44
def perform
55
return reject_payload! if delete_arrived_first?(@json['id']) || !related_to_local_activity?
66

7-
RedisLock.acquire(lock_options) do |lock|
8-
if lock.acquired?
9-
original_status = status_from_object
7+
lock_or_fail("announce:#{@object['id']}") do
8+
original_status = status_from_object
109

11-
return reject_payload! if original_status.nil? || !announceable?(original_status)
10+
return reject_payload! if original_status.nil? || !announceable?(original_status)
1211

13-
@status = Status.find_by(account: @account, reblog: original_status)
12+
@status = Status.find_by(account: @account, reblog: original_status)
1413

15-
return @status unless @status.nil?
14+
return @status unless @status.nil?
1615

17-
@status = Status.create!(
18-
account: @account,
19-
reblog: original_status,
20-
uri: @json['id'],
21-
created_at: @json['published'],
22-
override_timestamps: @options[:override_timestamps],
23-
visibility: visibility_from_audience
24-
)
16+
@status = Status.create!(
17+
account: @account,
18+
reblog: original_status,
19+
uri: @json['id'],
20+
created_at: @json['published'],
21+
override_timestamps: @options[:override_timestamps],
22+
visibility: visibility_from_audience
23+
)
2524

26-
distribute(@status)
27-
else
28-
raise Mastodon::RaceConditionError
29-
end
25+
distribute(@status)
3026
end
3127

3228
@status
@@ -69,8 +65,4 @@ def requested_through_relay?
6965
def reblog_of_local_status?
7066
status_from_uri(object_uri)&.account&.local?
7167
end
72-
73-
def lock_options
74-
{ redis: Redis.current, key: "announce:#{@object['id']}" }
75-
end
7668
end

app/lib/activitypub/activity/create.rb

Lines changed: 10 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -45,19 +45,15 @@ def message_franking
4545
def create_status
4646
return reject_payload! if unsupported_object_type? || invalid_origin?(object_uri) || tombstone_exists? || !related_to_local_activity?
4747

48-
RedisLock.acquire(lock_options) do |lock|
49-
if lock.acquired?
50-
return if delete_arrived_first?(object_uri) || poll_vote? # rubocop:disable Lint/NonLocalExitFromIterator
48+
lock_or_fail("create:#{object_uri}") do
49+
return if delete_arrived_first?(object_uri) || poll_vote? # rubocop:disable Lint/NonLocalExitFromIterator
5150

52-
@status = find_existing_status
51+
@status = find_existing_status
5352

54-
if @status.nil?
55-
process_status
56-
elsif @options[:delivered_to_account_id].present?
57-
postprocess_audience_and_deliver
58-
end
59-
else
60-
raise Mastodon::RaceConditionError
53+
if @status.nil?
54+
process_status
55+
elsif @options[:delivered_to_account_id].present?
56+
postprocess_audience_and_deliver
6157
end
6258
end
6359

@@ -313,13 +309,9 @@ def poll_vote!
313309
poll = replied_to_status.preloadable_poll
314310
already_voted = true
315311

316-
RedisLock.acquire(poll_lock_options) do |lock|
317-
if lock.acquired?
318-
already_voted = poll.votes.where(account: @account).exists?
319-
poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri)
320-
else
321-
raise Mastodon::RaceConditionError
322-
end
312+
lock_or_fail("vote:#{replied_to_status.poll_id}:#{@account.id}") do
313+
already_voted = poll.votes.where(account: @account).exists?
314+
poll.votes.create!(account: @account, choice: poll.options.index(@object['name']), uri: object_uri)
323315
end
324316

325317
increment_voters_count! unless already_voted
@@ -508,12 +500,4 @@ def increment_voters_count!
508500
poll.reload
509501
retry
510502
end
511-
512-
def lock_options
513-
{ redis: Redis.current, key: "create:#{object_uri}" }
514-
end
515-
516-
def poll_lock_options
517-
{ redis: Redis.current, key: "vote:#{replied_to_status.poll_id}:#{@account.id}" }
518-
end
519503
end

app/lib/activitypub/activity/delete.rb

Lines changed: 33 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -20,33 +20,35 @@ def delete_person
2020
def delete_note
2121
return if object_uri.nil?
2222

23-
unless invalid_origin?(object_uri)
24-
RedisLock.acquire(lock_options) { |_lock| delete_later!(object_uri) }
25-
Tombstone.find_or_create_by(uri: object_uri, account: @account)
26-
end
23+
lock_or_return("delete_status_in_progress:#{object_uri}", 5.minutes.seconds) do
24+
unless invalid_origin?(object_uri)
25+
# This lock ensures a concurrent `ActivityPub::Activity::Create` either
26+
# does not create a status at all, or has finished saving it to the
27+
# database before we try to load it.
28+
# Without the lock, `delete_later!` could be called after `delete_arrived_first?`
29+
# and `Status.find` before `Status.create!`
30+
lock_or_fail("create:#{object_uri}") { delete_later!(object_uri) }
2731

28-
@status = Status.find_by(uri: object_uri, account: @account)
29-
@status ||= Status.find_by(uri: @object['atomUri'], account: @account) if @object.is_a?(Hash) && @object['atomUri'].present?
32+
Tombstone.find_or_create_by(uri: object_uri, account: @account)
33+
end
3034

31-
return if @status.nil?
35+
@status = Status.find_by(uri: object_uri, account: @account)
36+
@status ||= Status.find_by(uri: @object['atomUri'], account: @account) if @object.is_a?(Hash) && @object['atomUri'].present?
3237

33-
if @status.distributable?
34-
forward_for_reply
35-
forward_for_reblogs
36-
end
38+
return if @status.nil?
3739

38-
delete_now!
40+
forward! if @json['signature'].present? && @status.distributable?
41+
delete_now!
42+
end
3943
end
4044

41-
def forward_for_reblogs
42-
return if @json['signature'].blank?
43-
44-
rebloggers_ids = @status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id)
45-
inboxes = Account.where(id: ::Follow.where(target_account_id: rebloggers_ids).select(:account_id)).inboxes - [@account.preferred_inbox_url]
45+
def rebloggers_ids
46+
return @rebloggers_ids if defined?(@rebloggers_ids)
47+
@rebloggers_ids = @status.reblogs.includes(:account).references(:account).merge(Account.local).pluck(:account_id)
48+
end
4649

47-
ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url|
48-
[payload, rebloggers_ids.first, inbox_url]
49-
end
50+
def inboxes_for_reblogs
51+
Account.where(id: ::Follow.where(target_account_id: rebloggers_ids).select(:account_id)).inboxes
5052
end
5153

5254
def replied_to_status
@@ -58,13 +60,19 @@ def reply_to_local?
5860
!replied_to_status.nil? && replied_to_status.account.local?
5961
end
6062

61-
def forward_for_reply
62-
return unless @json['signature'].present? && reply_to_local?
63+
def inboxes_for_reply
64+
replied_to_status.account.followers.inboxes
65+
end
66+
67+
def forward!
68+
inboxes = inboxes_for_reblogs
69+
inboxes += inboxes_for_reply if reply_to_local?
70+
inboxes -= [@account.preferred_inbox_url]
6371

64-
inboxes = replied_to_status.account.followers.inboxes - [@account.preferred_inbox_url]
72+
sender_id = reply_to_local? ? replied_to_status.account_id : rebloggers_ids.first
6573

66-
ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes) do |inbox_url|
67-
[payload, replied_to_status.account_id, inbox_url]
74+
ActivityPub::LowPriorityDeliveryWorker.push_bulk(inboxes.uniq) do |inbox_url|
75+
[payload, sender_id, inbox_url]
6876
end
6977
end
7078

@@ -75,8 +83,4 @@ def delete_now!
7583
def payload
7684
@payload ||= Oj.dump(@json)
7785
end
78-
79-
def lock_options
80-
{ redis: Redis.current, key: "create:#{object_uri}" }
81-
end
8286
end

app/services/remove_status_service.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,8 @@ def call(status, **options)
1616
@account = status.account
1717
@options = options
1818

19+
@status.discard
20+
1921
RedisLock.acquire(lock_options) do |lock|
2022
if lock.acquired?
2123
remove_from_self if @account.local?

spec/lib/activitypub/activity/delete_spec.rb

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,4 +49,24 @@
4949
end
5050
end
5151
end
52+
53+
context 'when the status has been reported' do
54+
describe '#perform' do
55+
subject { described_class.new(json, sender) }
56+
let!(:reporter) { Fabricate(:account) }
57+
58+
before do
59+
reporter.reports.create!(target_account: status.account, status_ids: [status.id], forwarded: false)
60+
subject.perform
61+
end
62+
63+
it 'marks the status as deleted' do
64+
expect(Status.find_by(id: status.id)).to be_nil
65+
end
66+
67+
it 'actually keeps a copy for inspection' do
68+
expect(Status.with_discarded.find_by(id: status.id)).to_not be_nil
69+
end
70+
end
71+
end
5272
end

0 commit comments

Comments
 (0)