Skip to content

Commit 24bcfd7

Browse files
authored
Harden Emby sync against partial failures (#5356) (#5423)
1 parent 19d7f04 commit 24bcfd7

7 files changed

Lines changed: 528 additions & 112 deletions

File tree

src/Ombi.Helpers/OmbiQuartz.cs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,20 @@ public class OmbiQuartz
2626
/// </summary>
2727
public static OmbiQuartz Instance => _instance ?? (_instance = new OmbiQuartz());
2828

29-
protected OmbiQuartz()
29+
protected OmbiQuartz() : this(true)
3030
{
31-
Init();
31+
}
32+
33+
/// <summary>
34+
/// Allows tests to skip the async scheduler initialisation and supply their own scheduler,
35+
/// since Init() runs async void and would otherwise race with (and overwrite) a test scheduler.
36+
/// </summary>
37+
protected OmbiQuartz(bool init)
38+
{
39+
if (init)
40+
{
41+
Init();
42+
}
3243
}
3344

3445
private async void Init()
@@ -86,21 +97,40 @@ public async Task AddJob<T>(string name, string group, string cronExpression, Di
8697
}
8798

8899
public static async Task TriggerJob(string jobName, string group)
100+
{
101+
await TriggerJobIfNotRunning(jobName, group);
102+
}
103+
104+
/// <summary>
105+
/// Triggers the job unless it is already running. Returns whether the job was triggered,
106+
/// so callers that depend on the job running (e.g. a resync after wiping data) can react.
107+
/// </summary>
108+
public static async Task<bool> TriggerJobIfNotRunning(string jobName, string group, IDictionary<string, object> data = null)
89109
{
90110
await _semaphore.WaitAsync();
91111

92112
try
93113
{
94-
if (!(await IsJobRunning(jobName)))
114+
if (await IsJobRunning(jobName))
115+
{
116+
return false;
117+
}
118+
119+
if (data != null)
120+
{
121+
await Scheduler.TriggerJob(new JobKey(jobName, group), new JobDataMap(data));
122+
}
123+
else
95124
{
96125
await Scheduler.TriggerJob(new JobKey(jobName, group));
97126
}
127+
128+
return true;
98129
}
99130
finally
100131
{
101132
_semaphore.Release();
102133
}
103-
104134
}
105135

106136

Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
using System.Collections.Generic;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
using Moq;
5+
using Moq.AutoMock;
6+
using NUnit.Framework;
7+
using Ombi.Api.External.MediaServers.Emby;
8+
using Ombi.Api.External.MediaServers.Emby.Models;
9+
using Ombi.Api.External.MediaServers.Emby.Models.Media.Tv;
10+
using Ombi.Api.External.MediaServers.Emby.Models.Movie;
11+
using Ombi.Core.Services;
12+
using Ombi.Core.Settings;
13+
using Ombi.Core.Settings.Models.External;
14+
using Ombi.Hubs;
15+
using Ombi.Schedule.Jobs.Emby;
16+
using Quartz;
17+
18+
namespace Ombi.Schedule.Tests
19+
{
20+
[TestFixture]
21+
public class EmbyContentSyncTests
22+
{
23+
private AutoMocker _mocker;
24+
private EmbyContentSync _subject;
25+
private Mock<IEmbyApi> _api;
26+
private Mock<IJobExecutionContext> _context;
27+
28+
[SetUp]
29+
public void Setup()
30+
{
31+
_mocker = new AutoMocker();
32+
33+
_api = new Mock<IEmbyApi>();
34+
_mocker.Setup<IEmbyApiFactory, IEmbyApi>(x => x.CreateClient(It.IsAny<EmbySettings>()))
35+
.Returns(_api.Object);
36+
37+
_mocker.Setup<ISettingsService<EmbySettings>, Task<EmbySettings>>(x => x.GetSettingsAsync())
38+
.ReturnsAsync(new EmbySettings
39+
{
40+
Enable = true,
41+
Servers = new List<EmbyServers>
42+
{
43+
new EmbyServers
44+
{
45+
Name = "Test",
46+
ApiKey = "key",
47+
AdministratorId = "admin",
48+
Ip = "localhost",
49+
Port = 8096
50+
}
51+
}
52+
});
53+
54+
_mocker.Setup<IFeatureService, Task<bool>>(x => x.FeatureEnabled(It.IsAny<string>()))
55+
.ReturnsAsync(false);
56+
57+
_context = new Mock<IJobExecutionContext>();
58+
_context.Setup(x => x.MergedJobDataMap).Returns(new JobDataMap());
59+
60+
var scheduler = new Mock<IScheduler>();
61+
scheduler.Setup(x => x.GetCurrentlyExecutingJobs(It.IsAny<CancellationToken>()))
62+
.ReturnsAsync(new List<IJobExecutionContext>());
63+
_ = new QuartzMock(scheduler);
64+
65+
_subject = _mocker.CreateInstance<EmbyContentSync>();
66+
}
67+
68+
[Test]
69+
public async Task TvSync_EmptyPageWithRemainingRecords_StopsInsteadOfRefetchingForever()
70+
{
71+
SetupMovies(totalRecordCount: 0);
72+
SetupShows(totalRecordCount: 50);
73+
74+
await _subject.Execute(_context.Object);
75+
76+
_api.Verify(x => x.GetAllShows(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<int>(), It.IsAny<int>(), It.IsAny<string>(), It.IsAny<string>()), Times.Once);
77+
_mocker.Verify<INotificationHubService>(x => x.SendNotificationToAdmins("Emby Content Sync Finished", It.IsAny<CancellationToken>()), Times.Once);
78+
}
79+
80+
[Test]
81+
public async Task MovieSync_EmptyPageWithRemainingRecords_StopsInsteadOfRefetchingForever()
82+
{
83+
SetupMovies(totalRecordCount: 50);
84+
SetupShows(totalRecordCount: 0);
85+
86+
await _subject.Execute(_context.Object);
87+
88+
_api.Verify(x => x.GetAllMovies(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<int>(), It.IsAny<int>(), It.IsAny<string>(), It.IsAny<string>()), Times.Once);
89+
_mocker.Verify<INotificationHubService>(x => x.SendNotificationToAdmins("Emby Content Sync Finished", It.IsAny<CancellationToken>()), Times.Once);
90+
}
91+
92+
private void SetupMovies(int totalRecordCount)
93+
{
94+
_api.Setup(x => x.GetAllMovies(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<int>(), It.IsAny<int>(), It.IsAny<string>(), It.IsAny<string>()))
95+
.ReturnsAsync(new EmbyItemContainer<EmbyMovie>
96+
{
97+
TotalRecordCount = totalRecordCount,
98+
Items = new List<EmbyMovie>()
99+
});
100+
}
101+
102+
private void SetupShows(int totalRecordCount)
103+
{
104+
_api.Setup(x => x.GetAllShows(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<int>(), It.IsAny<int>(), It.IsAny<string>(), It.IsAny<string>()))
105+
.ReturnsAsync(new EmbyItemContainer<EmbySeries>
106+
{
107+
TotalRecordCount = totalRecordCount,
108+
Items = new List<EmbySeries>()
109+
});
110+
}
111+
}
112+
}
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading;
5+
using System.Threading.Tasks;
6+
using Moq;
7+
using Moq.AutoMock;
8+
using NUnit.Framework;
9+
using Ombi.Api.External.MediaServers.Emby;
10+
using Ombi.Api.External.MediaServers.Emby.Models;
11+
using Ombi.Api.External.MediaServers.Emby.Models.Media.Tv;
12+
using Ombi.Core.Settings;
13+
using Ombi.Core.Settings.Models.External;
14+
using Ombi.Hubs;
15+
using Ombi.Schedule.Jobs.Emby;
16+
using Ombi.Store.Entities;
17+
using Ombi.Store.Repository;
18+
using Quartz;
19+
20+
namespace Ombi.Schedule.Tests
21+
{
22+
[TestFixture]
23+
public class EmbyEpisodeSyncTests
24+
{
25+
private AutoMocker _mocker;
26+
private EmbyEpisodeSync _subject;
27+
private Mock<IEmbyApi> _api;
28+
private Mock<IJobExecutionContext> _context;
29+
private List<EmbyEpisode> _addedEpisodes;
30+
31+
[SetUp]
32+
public void Setup()
33+
{
34+
_mocker = new AutoMocker();
35+
36+
_api = new Mock<IEmbyApi>();
37+
_mocker.Setup<IEmbyApiFactory, IEmbyApi>(x => x.CreateClient(It.IsAny<EmbySettings>()))
38+
.Returns(_api.Object);
39+
40+
SetupServers(Server("server1"));
41+
42+
// AddRange is called with a live HashSet that the sync clears afterwards,
43+
// so snapshot its contents at call time rather than inspecting it at verify time
44+
_addedEpisodes = new List<EmbyEpisode>();
45+
_mocker.GetMock<IEmbyContentRepository>()
46+
.Setup(x => x.AddRange(It.IsAny<IEnumerable<IMediaServerEpisode>>()))
47+
.Callback<IEnumerable<IMediaServerEpisode>>(e => _addedEpisodes.AddRange(e.Cast<EmbyEpisode>()))
48+
.Returns(Task.CompletedTask);
49+
50+
_mocker.Setup<IEmbyContentRepository, Task<HashSet<string>>>(x => x.GetAllSeriesEmbyIds())
51+
.ReturnsAsync(new HashSet<string> { "series1" });
52+
_mocker.Setup<IEmbyContentRepository, Task<Dictionary<string, (int EpisodeNumber, int SeasonNumber)>>>(x => x.GetAllEpisodeMetadata())
53+
.ReturnsAsync(new Dictionary<string, (int EpisodeNumber, int SeasonNumber)>());
54+
55+
_context = new Mock<IJobExecutionContext>();
56+
_context.Setup(x => x.MergedJobDataMap).Returns(new JobDataMap());
57+
58+
var scheduler = new Mock<IScheduler>();
59+
scheduler.Setup(x => x.GetCurrentlyExecutingJobs(It.IsAny<CancellationToken>()))
60+
.ReturnsAsync(new List<IJobExecutionContext>());
61+
_ = new QuartzMock(scheduler);
62+
63+
_subject = _mocker.CreateInstance<EmbyEpisodeSync>();
64+
}
65+
66+
[Test]
67+
public async Task EpisodeWithNullProviderIds_IsStillAdded()
68+
{
69+
_api.Setup(x => x.GetAllEpisodes(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<int>(), It.IsAny<int>(), It.IsAny<string>(), It.IsAny<string>()))
70+
.ReturnsAsync(Page(Episode("ep1")));
71+
72+
await _subject.Execute(_context.Object);
73+
74+
Assert.That(_addedEpisodes, Has.Count.EqualTo(1));
75+
Assert.That(_addedEpisodes.Single().EmbyId, Is.EqualTo("ep1"));
76+
Assert.That(_addedEpisodes.Single().TvDbId, Is.Null);
77+
_mocker.Verify<INotificationHubService>(x => x.SendNotificationToAdmins("Emby Episode Sync Failed", It.IsAny<CancellationToken>()), Times.Never);
78+
_mocker.Verify<INotificationHubService>(x => x.SendNotificationToAdmins("Emby Episode Sync Finished", It.IsAny<CancellationToken>()), Times.Once);
79+
}
80+
81+
[Test]
82+
public async Task EmptyPageWithRemainingRecords_StopsInsteadOfRefetchingForever()
83+
{
84+
_api.Setup(x => x.GetAllEpisodes(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<int>(), It.IsAny<int>(), It.IsAny<string>(), It.IsAny<string>()))
85+
.ReturnsAsync(new EmbyItemContainer<EmbyEpisodes> { TotalRecordCount = 100, Items = new List<EmbyEpisodes>() });
86+
87+
await _subject.Execute(_context.Object);
88+
89+
_api.Verify(x => x.GetAllEpisodes(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<int>(), It.IsAny<int>(), It.IsAny<string>(), It.IsAny<string>()), Times.Once);
90+
_mocker.Verify<INotificationHubService>(x => x.SendNotificationToAdmins("Emby Episode Sync Finished", It.IsAny<CancellationToken>()), Times.Once);
91+
}
92+
93+
[Test]
94+
public async Task ServerThrowing_SendsFailureNotification_AndDoesNotThrow()
95+
{
96+
_api.Setup(x => x.GetAllEpisodes(It.IsAny<string>(), It.IsAny<string>(), It.IsAny<int>(), It.IsAny<int>(), It.IsAny<string>(), It.IsAny<string>()))
97+
.ThrowsAsync(new InvalidOperationException("Emby returned something we cannot handle"));
98+
99+
await _subject.Execute(_context.Object);
100+
101+
_mocker.Verify<INotificationHubService>(x => x.SendNotificationToAdmins("Emby Episode Sync Failed", It.IsAny<CancellationToken>()), Times.Once);
102+
_mocker.Verify<INotificationHubService>(x => x.SendNotificationToAdmins("Emby Episode Sync Finished", It.IsAny<CancellationToken>()), Times.Once);
103+
}
104+
105+
[Test]
106+
public async Task OneServerFailing_DoesNotStopTheOtherServers()
107+
{
108+
SetupServers(Server("badServer"), Server("goodServer"));
109+
_api.Setup(x => x.GetAllEpisodes("badServer", It.IsAny<string>(), It.IsAny<int>(), It.IsAny<int>(), It.IsAny<string>(), It.IsAny<string>()))
110+
.ThrowsAsync(new InvalidOperationException("Emby returned something we cannot handle"));
111+
_api.Setup(x => x.GetAllEpisodes("goodServer", It.IsAny<string>(), It.IsAny<int>(), It.IsAny<int>(), It.IsAny<string>(), It.IsAny<string>()))
112+
.ReturnsAsync(Page(Episode("ep1")));
113+
114+
await _subject.Execute(_context.Object);
115+
116+
_api.Verify(x => x.GetAllEpisodes("goodServer", It.IsAny<string>(), It.IsAny<int>(), It.IsAny<int>(), It.IsAny<string>(), It.IsAny<string>()), Times.Once);
117+
Assert.That(_addedEpisodes.Select(x => x.EmbyId), Does.Contain("ep1"));
118+
_mocker.Verify<INotificationHubService>(x => x.SendNotificationToAdmins("Emby Episode Sync Failed", It.IsAny<CancellationToken>()), Times.Once);
119+
_mocker.Verify<INotificationHubService>(x => x.SendNotificationToAdmins("Emby Episode Sync Finished", It.IsAny<CancellationToken>()), Times.Once);
120+
}
121+
122+
private void SetupServers(params EmbyServers[] servers)
123+
{
124+
_mocker.Setup<ISettingsService<EmbySettings>, Task<EmbySettings>>(x => x.GetSettingsAsync())
125+
.ReturnsAsync(new EmbySettings
126+
{
127+
Enable = true,
128+
Servers = servers.ToList()
129+
});
130+
}
131+
132+
private static EmbyServers Server(string apiKey) => new EmbyServers
133+
{
134+
Name = apiKey,
135+
ApiKey = apiKey,
136+
AdministratorId = "admin",
137+
Ip = "localhost",
138+
Port = 8096
139+
};
140+
141+
private static EmbyItemContainer<EmbyEpisodes> Page(params EmbyEpisodes[] episodes) => new EmbyItemContainer<EmbyEpisodes>
142+
{
143+
TotalRecordCount = episodes.Length,
144+
Items = episodes.ToList()
145+
};
146+
147+
private static EmbyEpisodes Episode(string id, int episodeNumber = 1, int seasonNumber = 1) => new EmbyEpisodes
148+
{
149+
Id = id,
150+
Name = $"Episode {id}",
151+
SeriesId = "series1",
152+
SeriesName = "Series",
153+
IndexNumber = episodeNumber,
154+
ParentIndexNumber = seasonNumber,
155+
ProviderIds = null
156+
};
157+
}
158+
}

0 commit comments

Comments
 (0)