-
Notifications
You must be signed in to change notification settings - Fork 512
Expand file tree
/
Copy pathAbstractReader.cs
More file actions
271 lines (234 loc) · 7.65 KB
/
Copy pathAbstractReader.cs
File metadata and controls
271 lines (234 loc) · 7.65 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using SharpCompress.Common;
using SharpCompress.IO;
namespace SharpCompress.Readers;
/// <summary>
/// A generic push reader that reads unseekable comrpessed streams.
/// </summary>
public abstract partial class AbstractReader<TEntry, TVolume> : IReader, IAsyncReader
where TEntry : Entry
where TVolume : Volume
{
private bool _completed;
private IEnumerator<TEntry>? _entriesForCurrentReadStream;
private IAsyncEnumerator<TEntry>? _entriesForCurrentReadStreamAsync;
private bool _wroteCurrentEntry;
private readonly bool _disposeVolume;
internal AbstractReader(ReaderOptions options, ArchiveType type, bool disposeVolume = true)
{
Type = type;
_disposeVolume = disposeVolume;
Options = options;
}
internal ReaderOptions Options { get; }
public ArchiveType Type { get; }
/// <summary>
/// Current volume that the current entry resides in
/// </summary>
public abstract TVolume? Volume { get; }
/// <summary>
/// Current file entry (from either sync or async enumeration).
/// </summary>
public TEntry Entry
{
get
{
if (_entriesForCurrentReadStreamAsync is not null)
{
return _entriesForCurrentReadStreamAsync.Current;
}
return _entriesForCurrentReadStream.NotNull().Current;
}
}
#region IDisposable Members
public virtual void Dispose()
{
_entriesForCurrentReadStream?.Dispose();
if (_disposeVolume)
{
Volume?.Dispose();
}
}
#endregion
public bool Cancelled { get; private set; }
/// <summary>
/// Indicates that the remaining entries are not required.
/// On dispose of an EntryStream, the stream will not skip to the end of the entry.
/// An attempt to move to the next entry will throw an exception, as the compressed stream is not positioned at an entry boundary.
/// </summary>
public void Cancel()
{
if (!_completed)
{
Cancelled = true;
}
}
public bool MoveToNextEntry()
{
if (_entriesForCurrentReadStreamAsync is not null)
{
throw new ArchiveOperationException(
$"{nameof(MoveToNextEntry)} cannot be used after {nameof(MoveToNextEntryAsync)} has been used."
);
}
if (_completed)
{
return false;
}
if (Cancelled)
{
throw new ReaderCancelledException("Reader has been cancelled.");
}
if (_entriesForCurrentReadStream is null)
{
return LoadStreamForReading(RequestInitialStream());
}
if (!_wroteCurrentEntry)
{
SkipEntry();
}
_wroteCurrentEntry = false;
if (NextEntryForCurrentStream())
{
return true;
}
_completed = true;
return false;
}
protected bool LoadStreamForReading(Stream stream)
{
if (_entriesForCurrentReadStreamAsync is not null)
{
throw new ArchiveOperationException(
$"{nameof(LoadStreamForReading)} cannot be used after {nameof(LoadStreamForReadingAsync)} has been used."
);
}
_entriesForCurrentReadStream?.Dispose();
if (stream is null || !stream.CanRead)
{
throw new MultipartStreamRequiredException(
"File is split into multiple archives: '"
+ Entry.Key
+ "'. A new readable stream is required. Use Cancel if it was intended."
);
}
_entriesForCurrentReadStream = GetEntries(stream).GetEnumerator();
return _entriesForCurrentReadStream.MoveNext();
}
protected virtual Stream RequestInitialStream() =>
Volume.NotNull("Volume isn't loaded.").Stream;
protected virtual ValueTask<Stream> RequestInitialStreamAsync(
CancellationToken cancellationToken = default
) => new(RequestInitialStream());
internal virtual bool NextEntryForCurrentStream() =>
_entriesForCurrentReadStream.NotNull().MoveNext();
protected abstract IEnumerable<TEntry> GetEntries(Stream stream);
#region Entry Skip/Write
private void SkipEntry()
{
if (!Entry.IsDirectory)
{
Skip();
}
}
private void Skip()
{
var part = Entry.Parts.First();
if (!Entry.IsSplitAfter && !Entry.IsSolid && Entry.CompressedSize > 0)
{
//not solid and has a known compressed size then we can skip raw bytes.
var rawStream = part.GetRawStream();
if (rawStream != null)
{
var bytesToAdvance = Entry.CompressedSize;
rawStream.Skip(bytesToAdvance);
part.Skipped = true;
return;
}
}
//don't know the size so we have to try to decompress to skip
using var s = OpenEntryStream();
s.SkipEntry();
}
public void WriteEntryTo(Stream writableStream) =>
WriteEntryTo(writableStream, Options.BufferSize);
private void WriteEntryTo(Stream writableStream, int bufferSize)
{
if (_wroteCurrentEntry)
{
throw new ArgumentException("WriteEntryTo or OpenEntryStream can only be called once.");
}
ThrowHelper.ThrowIfNull(writableStream);
if (!writableStream.CanWrite)
{
throw new ArgumentException(
"A writable Stream was required. Use Cancel if that was intended."
);
}
Write(writableStream, bufferSize);
_wroteCurrentEntry = true;
}
internal void Write(Stream writeStream) => Write(writeStream, Options.BufferSize);
internal void Write(Stream writeStream, int bufferSize)
{
using Stream s = OpenEntryStream();
var sourceStream = WrapWithProgress(s, Entry);
sourceStream.CopyTo(writeStream, bufferSize);
}
private Stream WrapWithProgress(Stream source, Entry entry)
{
var progress = Options.Progress;
if (progress is null)
{
return source;
}
var entryPath = entry.Key ?? string.Empty;
long? totalBytes = GetEntrySizeSafe(entry);
return new ProgressReportingStream(
source,
progress,
entryPath,
totalBytes,
leaveOpen: true
);
}
private static long? GetEntrySizeSafe(Entry entry)
{
try
{
var size = entry.Size;
// Return the actual size (including 0 for empty entries)
// Negative values indicate unknown size
return size >= 0 ? size : null;
}
catch (NotImplementedException)
{
return null;
}
}
public EntryStream OpenEntryStream()
{
if (_wroteCurrentEntry)
{
throw new ArgumentException("WriteEntryTo or OpenEntryStream can only be called once.");
}
var stream = GetEntryStream();
_wroteCurrentEntry = true;
return stream;
}
/// <summary>
/// Retains a reference to the entry stream, so we can check whether it completed later.
/// </summary>
protected EntryStream CreateEntryStream(Stream? decompressed) =>
new(this, decompressed.NotNull());
protected virtual EntryStream GetEntryStream() =>
CreateEntryStream(Entry.Parts.First().GetCompressedStream());
#endregion
IEntry IReader.Entry => Entry;
IEntry IAsyncReader.Entry => Entry;
}