Skip to content

Commit b3a1295

Browse files
committed
chore: slightly refactor miniPuller
1 parent 83ee7b3 commit b3a1295

File tree

3 files changed

+35
-23
lines changed

3 files changed

+35
-23
lines changed

SushitrainCore/src/puller.go

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"encoding/base64"
66
"errors"
7+
"io"
78
"math"
89
"slices"
910
"time"
@@ -23,6 +24,7 @@ type miniPuller struct {
2324
measurements *Measurements
2425
experiences map[protocol.DeviceID]bool
2526
context context.Context
27+
internals *syncthing.Internals
2628
}
2729

2830
func ClearBlockCache() {
@@ -49,7 +51,7 @@ func (mp *miniPuller) downloadRange(m *syncthing.Internals, folderID string, fil
4951

5052
// Fetch block
5153
block := file.Blocks[blockIndex]
52-
buf, err := mp.downloadBock(m, folderID, int(blockIndex), file, block)
54+
buf, err := mp.downloadBock(folderID, int(blockIndex), file, block)
5355
if err != nil {
5456
slog.Warn("error downloading block", "index", blockIndex, "total", len(file.Blocks), "cause", err)
5557
return 0, err
@@ -85,7 +87,7 @@ func (mp *miniPuller) timeoutFor(block *protocol.BlockInfo) time.Duration {
8587
return time.Duration(max(1.0, float32(block.Size)/float32(minBytesPerSecond))) * time.Second
8688
}
8789

88-
func (mp *miniPuller) downloadBock(m *syncthing.Internals, folderID string, blockIndex int, file protocol.FileInfo, block protocol.BlockInfo) ([]byte, error) {
90+
func (mp *miniPuller) downloadBock(folderID string, blockIndex int, file protocol.FileInfo, block protocol.BlockInfo) ([]byte, error) {
8991
blockHashString := base64.StdEncoding.EncodeToString([]byte(block.Hash))
9092

9193
// Do we have this file in the local cache?
@@ -94,7 +96,7 @@ func (mp *miniPuller) downloadBock(m *syncthing.Internals, folderID string, bloc
9496
return cached, nil
9597
}
9698

97-
availables, err := m.BlockAvailability(folderID, file, block)
99+
availables, err := mp.internals.BlockAvailability(folderID, file, block)
98100
if err != nil {
99101
return nil, err
100102
}
@@ -132,13 +134,13 @@ func (mp *miniPuller) downloadBock(m *syncthing.Internals, folderID string, bloc
132134

133135
if exp, ok := mp.experiences[available.ID]; ok && exp {
134136
// Skip devices we're not connected to
135-
if !m.IsConnectedTo(available.ID) {
137+
if !mp.internals.IsConnectedTo(available.ID) {
136138
continue
137139
}
138140

139141
downloadBlockCtx, cancelDownloadBlock := context.WithTimeout(mp.context, mp.timeoutFor(&block))
140142
defer cancelDownloadBlock()
141-
buf, err := m.DownloadBlock(downloadBlockCtx, available.ID, folderID, file.Name, int(blockIndex), block, available.FromTemporary)
143+
buf, err := mp.internals.DownloadBlock(downloadBlockCtx, available.ID, folderID, file.Name, int(blockIndex), block, available.FromTemporary)
142144
// Remember our experience with this peer for next time
143145
mp.experiences[available.ID] = err == nil || err == context.Canceled
144146
if err == nil {
@@ -159,13 +161,13 @@ func (mp *miniPuller) downloadBock(m *syncthing.Internals, folderID string, bloc
159161

160162
if _, ok := mp.experiences[available.ID]; !ok {
161163
// Skip devices we're not connected to
162-
if !m.IsConnectedTo(available.ID) {
164+
if !mp.internals.IsConnectedTo(available.ID) {
163165
continue
164166
}
165167

166168
downloadBlockCtx, cancelDownloadBlock := context.WithTimeout(mp.context, mp.timeoutFor(&block))
167169
defer cancelDownloadBlock()
168-
buf, err := m.DownloadBlock(downloadBlockCtx, available.ID, folderID, file.Name, int(blockIndex), block, available.FromTemporary)
170+
buf, err := mp.internals.DownloadBlock(downloadBlockCtx, available.ID, folderID, file.Name, int(blockIndex), block, available.FromTemporary)
169171
// Remember our experience with this peer for next time
170172
mp.experiences[available.ID] = err == nil || err == context.Canceled
171173
if err == nil {
@@ -186,13 +188,13 @@ func (mp *miniPuller) downloadBock(m *syncthing.Internals, folderID string, bloc
186188

187189
if exp, ok := mp.experiences[available.ID]; ok && !exp {
188190
// Skip devices we're not connected to
189-
if !m.IsConnectedTo(available.ID) {
191+
if !mp.internals.IsConnectedTo(available.ID) {
190192
continue
191193
}
192194

193195
downloadBlockCtx, cancelDownloadBlock := context.WithTimeout(mp.context, mp.timeoutFor(&block))
194196
defer cancelDownloadBlock()
195-
buf, err := m.DownloadBlock(downloadBlockCtx, available.ID, folderID, file.Name, int(blockIndex), block, available.FromTemporary)
197+
buf, err := mp.internals.DownloadBlock(downloadBlockCtx, available.ID, folderID, file.Name, int(blockIndex), block, available.FromTemporary)
196198
// Remember our experience with this peer for next time
197199
mp.experiences[available.ID] = err == nil || err == context.Canceled
198200
if err == nil {
@@ -207,10 +209,25 @@ func (mp *miniPuller) downloadBock(m *syncthing.Internals, folderID string, bloc
207209
return nil, errors.New("no peer to download this block from")
208210
}
209211

210-
func newMiniPuller(ctx context.Context, measurements *Measurements) *miniPuller {
212+
func newMiniPuller(ctx context.Context, measurements *Measurements, internals *syncthing.Internals) *miniPuller {
211213
return &miniPuller{
212214
experiences: map[protocol.DeviceID]bool{},
213215
context: ctx,
214216
measurements: measurements,
217+
internals: internals,
215218
}
216219
}
220+
221+
func (mp *miniPuller) DownloadInto(w io.Writer, folderID string, info protocol.FileInfo) error {
222+
for blockNo, block := range info.Blocks {
223+
buf, err := mp.downloadBock(folderID, blockNo, info, block)
224+
if err != nil {
225+
return err
226+
}
227+
_, err = w.Write(buf)
228+
if err != nil {
229+
return err
230+
}
231+
}
232+
return nil
233+
}

SushitrainCore/src/server.go

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -218,7 +218,7 @@ func serveEntry(w http.ResponseWriter, r *http.Request, folderID string, entry *
218218
return
219219
}
220220

221-
mp := newMiniPuller(r.Context(), measurements)
221+
mp := newMiniPuller(r.Context(), measurements, m)
222222

223223
blockSize := int64(info.BlockSize())
224224
for _, rng := range parsedRanges {
@@ -259,7 +259,7 @@ func serveEntry(w http.ResponseWriter, r *http.Request, folderID string, entry *
259259

260260
// Fetch block
261261
block := info.Blocks[blockIndex]
262-
buf, err := mp.downloadBock(m, folderID, int(blockIndex), info, block)
262+
buf, err := mp.downloadBock(folderID, int(blockIndex), info, block)
263263
if err != nil {
264264
slog.Warn("error downloading block", "blockIndex", blockIndex, "blockCount", len(info.Blocks), "cause", err)
265265

@@ -307,16 +307,11 @@ func serveEntry(w http.ResponseWriter, r *http.Request, folderID string, entry *
307307
// We have this file completely locally
308308
w.Write(buffer)
309309
} else {
310-
fetchedBytes := int64(0)
311-
mp := newMiniPuller(r.Context(), measurements)
312-
313-
for blockNo, block := range info.Blocks {
314-
buf, err := mp.downloadBock(m, folderID, blockNo, info, block)
315-
if err != nil {
316-
return
317-
}
318-
fetchedBytes += int64(block.Size)
319-
w.Write(buf)
310+
mp := newMiniPuller(r.Context(), measurements, m)
311+
err := mp.DownloadInto(w, folderID, info)
312+
if err != nil {
313+
slog.Error("downloading block", "cause", err)
314+
return // Can't write an HTTP header anymore
320315
}
321316
}
322317
}

SushitrainCore/src/zip.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func (e *Entry) Archive() Archive {
5454
ctx := context.Background()
5555
return &entryArchive{
5656
entry: e,
57-
puller: newMiniPuller(ctx, e.Folder.client.Measurements),
57+
puller: newMiniPuller(ctx, e.Folder.client.Measurements, e.Folder.client.app.Internals),
5858
mutex: sync.Mutex{},
5959
files: nil,
6060
}

0 commit comments

Comments
 (0)