-
Notifications
You must be signed in to change notification settings - Fork 24
Expand file tree
/
Copy pathPriceTrackingActor.cs
More file actions
108 lines (90 loc) · 3.55 KB
/
Copy pathPriceTrackingActor.cs
File metadata and controls
108 lines (90 loc) · 3.55 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
// -----------------------------------------------------------------------
// <copyright file="PriceTrackingActor.cs" company="Petabridge, LLC">
// Copyright (C) 2015 - 2019 Petabridge, LLC <https://petabridge.com>
// </copyright>
// -----------------------------------------------------------------------
using System;
using Akka.Actor;
using Akka.CQRS.Pricing.Commands;
using Akka.CQRS.Pricing.Events;
using Akka.CQRS.Pricing.Views;
using Petabridge.Cmd;
namespace Akka.CQRS.Pricing.Cli
{
/// <summary>
/// Actor responsible for populating the output for the <see cref="PricingCmd.TrackPrice"/> command.
/// </summary>
public sealed class PriceTrackingActor : ReceiveActor, IWithUnboundedStash
{
private readonly string _tickerSymbol;
private readonly IActorRef _priceViewActor;
private readonly IActorRef _commandHandlerActor;
private ICancelable _priceCheckInterval;
private IPriceUpdate _currentPrice;
public PriceTrackingActor(string tickerSymbol, IActorRef priceViewActor, IActorRef commandHandlerActor)
{
_priceViewActor = priceViewActor;
_commandHandlerActor = commandHandlerActor;
_tickerSymbol = tickerSymbol;
WaitingForPriceHistory();
}
private void WaitingForPriceHistory()
{
Receive<PriceHistory>(p =>
{
if (p.HistoricalPrices.IsEmpty)
{
_commandHandlerActor.Tell(new CommandResponse($"No historical price data for [{_tickerSymbol}] - waiting for updates.", false));
BecomePriceUpdates();
return;
}
_currentPrice = p.CurrentPriceUpdate;
foreach (var e in p.HistoricalPrices)
{
_commandHandlerActor.Tell(new CommandResponse(e.ToString(), false));
}
BecomePriceUpdates();
});
Receive<ReceiveTimeout>(t =>
{
_commandHandlerActor.Tell(new CommandResponse($"No historical price data for [{_tickerSymbol}] - waiting for updates.", false));
BecomePriceUpdates();
});
ReceiveAny(_ => Stash.Stash());
}
private void BecomePriceUpdates()
{
Context.SetReceiveTimeout(null);
Become(PriceUpdates);
Stash.UnstashAll();
}
private void PriceUpdates()
{
Receive<IPriceUpdate>(p =>
{
_currentPrice = p;
_commandHandlerActor.Tell(new CommandResponse(p.ToString(), false));
});
Receive<Terminated>(t =>
{
_commandHandlerActor.Tell(new CommandResponse("Price View Actor terminated."));
Context.Stop(Self);
});
}
protected override void PreStart()
{
var getlatestPrice = new GetLatestPrice(_tickerSymbol);
// get the historical price
_priceViewActor.Tell(new GetPriceHistory(_tickerSymbol));
_priceCheckInterval = Context.System.Scheduler.ScheduleTellRepeatedlyCancelable(TimeSpan.FromSeconds(3),
TimeSpan.FromSeconds(3), _priceViewActor, getlatestPrice, Self);
Context.SetReceiveTimeout(TimeSpan.FromSeconds(1));
Context.Watch(_priceViewActor);
}
protected override void PostStop()
{
_priceCheckInterval.Cancel();
}
public IStash Stash { get; set; }
}
}