-
Notifications
You must be signed in to change notification settings - Fork 5.1k
Expand file tree
/
Copy pathCommitChunkHandler.cs
More file actions
134 lines (120 loc) · 5.75 KB
/
CommitChunkHandler.cs
File metadata and controls
134 lines (120 loc) · 5.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
using System;
using System.Threading.Tasks;
using System.Threading;
using Azure.Storage.Common;
namespace Azure.Storage.DataMovement
{
internal class CommitChunkHandler : IDisposable
{
#region Delegate Definitions
public delegate Task QueuePutBlockTaskInternal(long offset, long blockSize, long expectedLength, StorageResourceItemProperties properties);
public delegate Task QueueCommitBlockTaskInternal(StorageResourceItemProperties sourceProperties);
public delegate void ReportProgressInBytes(long bytesWritten);
public delegate Task InvokeFailedEventHandlerInternal(Exception ex);
#endregion Delegate Definitions
private readonly QueuePutBlockTaskInternal _queuePutBlockTask;
private readonly QueueCommitBlockTaskInternal _queueCommitBlockTask;
private readonly ReportProgressInBytes _reportProgressInBytes;
private readonly InvokeFailedEventHandlerInternal _invokeFailedEventHandler;
public struct Behaviors
{
public QueuePutBlockTaskInternal QueuePutBlockTask { get; set; }
public QueueCommitBlockTaskInternal QueueCommitBlockTask { get; set; }
public ReportProgressInBytes ReportProgressInBytes { get; set; }
public InvokeFailedEventHandlerInternal InvokeFailedHandler { get; set; }
}
/// <summary>
/// Create channel of <see cref="QueueStageChunkArgs"/> to keep track of that are
/// waiting to update the bytesTransferred and other required operations.
/// </summary>
private readonly IProcessor<QueueStageChunkArgs> _stageChunkProcessor;
private readonly CancellationToken _cancellationToken;
private long _bytesTransferred;
private readonly long _expectedLength;
private readonly long _blockSize;
private readonly DataTransferOrder _transferOrder;
private readonly StorageResourceItemProperties _sourceProperties;
public CommitChunkHandler(
long expectedLength,
long blockSize,
Behaviors behaviors,
DataTransferOrder transferOrder,
StorageResourceItemProperties sourceProperties,
CancellationToken cancellationToken)
{
if (expectedLength <= 0)
{
throw Errors.InvalidExpectedLength(expectedLength);
}
Argument.AssertNotNull(behaviors, nameof(behaviors));
_cancellationToken = cancellationToken;
// Set bytes transferred to block size because we transferred the initial block
_bytesTransferred = blockSize;
_expectedLength = expectedLength;
_blockSize = blockSize;
_transferOrder = transferOrder;
_sourceProperties = sourceProperties;
_queuePutBlockTask = behaviors.QueuePutBlockTask
?? throw Errors.ArgumentNull(nameof(behaviors.QueuePutBlockTask));
_queueCommitBlockTask = behaviors.QueueCommitBlockTask
?? throw Errors.ArgumentNull(nameof(behaviors.QueueCommitBlockTask));
_reportProgressInBytes = behaviors.ReportProgressInBytes
?? throw Errors.ArgumentNull(nameof(behaviors.ReportProgressInBytes));
_invokeFailedEventHandler = behaviors.InvokeFailedHandler
?? throw Errors.ArgumentNull(nameof(behaviors.InvokeFailedHandler));
_stageChunkProcessor = ChannelProcessing.NewProcessor<QueueStageChunkArgs>(
readers: 1,
capacity: DataMovementConstants.Channels.StageChunkCapacity);
_stageChunkProcessor.Process = ProcessCommitRange;
}
public void Dispose()
{
_stageChunkProcessor.TryComplete();
}
public async ValueTask QueueChunkAsync(QueueStageChunkArgs args)
{
await _stageChunkProcessor.QueueAsync(args).ConfigureAwait(false);
}
private async Task ProcessCommitRange(QueueStageChunkArgs args, CancellationToken cancellationToken = default)
{
try
{
_bytesTransferred += args.BytesTransferred;
_reportProgressInBytes(args.BytesTransferred);
if (_bytesTransferred == _expectedLength)
{
// Add CommitBlockList task to the channel
await _queueCommitBlockTask(_sourceProperties).ConfigureAwait(false);
}
else if (_bytesTransferred < _expectedLength)
{
// If this is a sequential transfer, we need to queue the next chunk
if (_transferOrder == DataTransferOrder.Sequential)
{
long newOffset = args.Offset + _blockSize;
long blockLength = (newOffset + _blockSize < _expectedLength) ?
_blockSize :
_expectedLength - newOffset;
await _queuePutBlockTask(
newOffset,
blockLength,
_expectedLength,
_sourceProperties).ConfigureAwait(false);
}
}
else // _bytesTransferred > _expectedLength
{
throw Errors.MismatchLengthTransferred(
expectedLength: _expectedLength,
actualLength: _bytesTransferred);
}
}
catch (Exception ex)
{
await _invokeFailedEventHandler(ex).ConfigureAwait(false);
}
}
}
}