-
Notifications
You must be signed in to change notification settings - Fork 262
Expand file tree
/
Copy pathserver_multicast_writer_media.go
More file actions
123 lines (104 loc) · 2.54 KB
/
server_multicast_writer_media.go
File metadata and controls
123 lines (104 loc) · 2.54 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package gortsplib
import (
"context"
"fmt"
"net"
"time"
"github.com/bluenviron/gortsplib/v5/internal/asyncprocessor"
"github.com/bluenviron/gortsplib/v5/pkg/description"
"github.com/bluenviron/gortsplib/v5/pkg/liberrors"
"github.com/pion/rtcp"
)
type serverMulticastWriterMedia struct {
media *description.Media
maxPacketSize int
udpReadBufferSize int
listenPacket func(network string, address string) (net.PacketConn, error)
writeQueueSize int
writeTimeout time.Duration
ip net.IP
rtpPort int
rtcpPort int
srtpOutCtx *wrappedSRTPContext
rtpl *serverUDPListener
rtcpl *serverUDPListener
writer *asyncprocessor.Processor
rtpAddr *net.UDPAddr
rtcpAddr *net.UDPAddr
formats map[uint8]*serverMulticastWriterFormat
}
func (smm *serverMulticastWriterMedia) initialize() error {
rtpl, rtcpl, err := createUDPListenerMulticastPair(
smm.udpReadBufferSize,
smm.listenPacket,
smm.writeTimeout,
smm.rtpPort,
smm.rtcpPort,
smm.ip,
)
if err != nil {
return err
}
rtpAddr := &net.UDPAddr{
IP: smm.ip,
Port: smm.rtpPort,
}
rtcpAddr := &net.UDPAddr{
IP: smm.ip,
Port: smm.rtcpPort,
}
smm.rtpl = rtpl
smm.rtcpl = rtcpl
smm.rtpAddr = rtpAddr
smm.rtcpAddr = rtcpAddr
smm.writer = &asyncprocessor.Processor{
BufferSize: smm.writeQueueSize,
OnError: func(_ context.Context, _ error) {},
}
smm.writer.Initialize()
smm.writer.Start()
smm.formats = make(map[uint8]*serverMulticastWriterFormat)
return nil
}
func (smm *serverMulticastWriterMedia) close() {
for _, smf := range smm.formats {
smf.close()
}
smm.rtpl.close()
smm.rtcpl.close()
smm.writer.Close()
}
func (smm *serverMulticastWriterMedia) writePacketRTCP(pkt rtcp.Packet) error {
plain, err := pkt.Marshal()
if err != nil {
return err
}
maxPlainPacketSize := smm.maxPacketSize
if smm.srtpOutCtx != nil {
maxPlainPacketSize -= srtcpOverhead
}
if len(plain) > maxPlainPacketSize {
return fmt.Errorf("packet is too big")
}
var encr []byte
if smm.srtpOutCtx != nil {
encr = make([]byte, smm.maxPacketSize)
encr, err = smm.srtpOutCtx.encryptRTCP(encr, plain, nil)
if err != nil {
return err
}
}
if smm.srtpOutCtx != nil {
return smm.writePacketRTCPEncoded(encr)
}
return smm.writePacketRTCPEncoded(plain)
}
func (smm *serverMulticastWriterMedia) writePacketRTCPEncoded(payload []byte) error {
ok := smm.writer.Push(func() error {
return smm.rtcpl.write(payload, smm.rtcpAddr)
})
if !ok {
return liberrors.ErrServerWriteQueueFull{}
}
return nil
}