4242import threading
4343import time
4444
45- __all__ = [' Sender' , ' init' , ' send' ]
45+ __all__ = [" Sender" , " init" , " send" ]
4646
47- __version__ = ' 1.7.1'
47+ __version__ = " 1.7.1"
4848
4949default_sender = None
5050logger = logging .getLogger (__name__ )
@@ -55,9 +55,20 @@ def _has_whitespace(value):
5555
5656
5757class Sender :
58- def __init__ (self , host , port = 2003 , prefix = None , timeout = 5 , interval = None ,
59- queue_size = None , log_sends = False , protocol = 'tcp' ,
60- batch_size = 1000 , tags = None , raise_send_errors = False ):
58+ def __init__ (
59+ self ,
60+ host ,
61+ port = 2003 ,
62+ prefix = None ,
63+ timeout = 5 ,
64+ interval = None ,
65+ queue_size = None ,
66+ log_sends = False ,
67+ protocol = "tcp" ,
68+ batch_size = 1000 ,
69+ tags = None ,
70+ raise_send_errors = False ,
71+ ):
6172 """Initialize a Sender instance, starting the background thread to
6273 send messages at given interval (in seconds) if "interval" is not
6374 None. Send at most "batch_size" messages per socket send operation.
@@ -83,7 +94,7 @@ def __init__(self, host, port=2003, prefix=None, timeout=5, interval=None,
8394
8495 if self .interval is not None :
8596 if raise_send_errors :
86- raise ValueError (' raise_send_errors must be disabled when interval is set' )
97+ raise ValueError (" raise_send_errors must be disabled when interval is set" )
8798 if queue_size is None :
8899 queue_size = int (round (interval )) * 100
89100 self ._queue = queue .Queue (maxsize = queue_size )
@@ -111,19 +122,18 @@ def build_message(self, metric, value, timestamp, tags=None):
111122 if _has_whitespace (metric ):
112123 raise ValueError ('"metric" must not have whitespace in it' )
113124 if not isinstance (value , (int , float )):
114- raise TypeError ('"value" must be an int or a float, not a %s' ,
115- type (value ).__name__ )
125+ raise TypeError ('"value" must be an int or a float, not a %s' , type (value ).__name__ )
116126
117127 all_tags = self .tags .copy ()
118128 all_tags .update (tags )
119- tags_strs = [f' ;{ k } ={ v } ' for k , v in sorted (all_tags .items ())]
129+ tags_strs = [f" ;{ k } ={ v } " for k , v in sorted (all_tags .items ())]
120130 if any (_has_whitespace (t ) for t in tags_strs ):
121131 raise ValueError ('"tags" keys and values must not have whitespace in them' )
122- tags_suffix = '' .join (tags_strs )
132+ tags_suffix = "" .join (tags_strs )
123133
124- prefix = self .prefix + '.' if self .prefix else ''
134+ prefix = self .prefix + "." if self .prefix else ""
125135 message = f"{ prefix } { metric } { tags_suffix } { value } { int (round (timestamp ))} \n "
126- message = message .encode (' utf-8' )
136+ message = message .encode (" utf-8" )
127137 return message
128138
129139 def send (self , metric , value , timestamp = None , tags = None ):
@@ -147,18 +157,18 @@ def send(self, metric, value, timestamp=None, tags=None):
147157 try :
148158 self ._queue .put_nowait (message )
149159 except queue .Full :
150- logger .error (' queue full when sending %s' , message )
160+ logger .error (" queue full when sending %s" , message )
151161
152162 def send_message (self , message ):
153- if self .protocol == ' tcp' :
163+ if self .protocol == " tcp" :
154164 with socket .create_connection ((self .host , self .port ), self .timeout ) as sock :
155165 sock .setsockopt (socket .SOL_SOCKET , socket .SO_REUSEADDR , 1 )
156166 sock .sendall (message )
157- elif self .protocol == ' udp' :
167+ elif self .protocol == " udp" :
158168 with socket .socket (socket .AF_INET , socket .SOCK_DGRAM ) as sock :
159169 sock .sendto (message , (self .host , self .port ))
160170 else :
161- raise ValueError ('" protocol" must be \ ' tcp\ ' or \ ' udp\ ' , not %s' , self .protocol )
171+ raise ValueError (" \" protocol\ " must be 'tcp' or 'udp', not %s" , self .protocol )
162172
163173 def send_socket (self , message ):
164174 """Low-level function to send message bytes to this Sender's socket.
@@ -172,12 +182,11 @@ def send_socket(self, message):
172182 except Exception as error :
173183 if self .raise_send_errors :
174184 raise
175- logger .error (' error sending message %s: %s' , message , error )
185+ logger .error (" error sending message %s: %s" , message , error )
176186 else :
177187 if self .log_sends :
178188 elapsed_time = time .time () - start_time
179- logger .info ('sent message %s to %s:%s in %s seconds' ,
180- message , self .host , self .port , elapsed_time )
189+ logger .info ("sent message %s to %s:%s in %s seconds" , message , self .host , self .port , elapsed_time )
181190
182191 def _thread_loop (self ):
183192 """Background thread used when Sender is in asynchronous/interval mode."""
@@ -218,14 +227,15 @@ def _thread_loop(self):
218227 if current_time - last_check_time >= self .interval :
219228 last_check_time = current_time
220229 for i in range (0 , len (messages ), self .batch_size ):
221- batch = messages [i : i + self .batch_size ]
222- self .send_socket (b'' .join (batch ))
230+ batch = messages [i : i + self .batch_size ]
231+ self .send_socket (b"" .join (batch ))
223232 messages = []
224233
225234 # Send any final messages before exiting thread
226235 for i in range (0 , len (messages ), self .batch_size ):
227- batch = messages [i :i + self .batch_size ]
228- self .send_socket (b'' .join (batch ))
236+ batch = messages [i : i + self .batch_size ]
237+ self .send_socket (b"" .join (batch ))
238+
229239
230240def init (* args , ** kwargs ):
231241 """Initialize default Sender instance with given args."""
@@ -238,29 +248,23 @@ def send(*args, **kwargs):
238248 default_sender .send (* args , ** kwargs )
239249
240250
241- if __name__ == ' __main__' :
251+ if __name__ == " __main__" :
242252 import argparse
243253
244254 parser = argparse .ArgumentParser ()
245- parser .add_argument ('metric' ,
246- help = 'name of metric to send' )
247- parser .add_argument ('value' , type = float ,
248- help = 'numeric value to send' )
249- parser .add_argument ('-s' , '--server' , default = 'localhost' ,
250- help = 'hostname of Graphite server to send to, default %(default)s' )
251- parser .add_argument ('-p' , '--port' , type = int , default = 2003 ,
252- help = 'port to send message to, default %(default)d' )
253- parser .add_argument ('-u' , '--udp' , action = 'store_true' ,
254- help = 'send via UDP instead of TCP' )
255- parser .add_argument ('-t' , '--timestamp' , type = int ,
256- help = 'Unix timestamp for message (defaults to current time)' )
257- parser .add_argument ('-q' , '--quiet' , action = 'store_true' ,
258- help = "quiet mode (don't log send to stdout)" )
255+ parser .add_argument ("metric" , help = "name of metric to send" )
256+ parser .add_argument ("value" , type = float , help = "numeric value to send" )
257+ parser .add_argument (
258+ "-s" , "--server" , default = "localhost" , help = "hostname of Graphite server to send to, default %(default)s"
259+ )
260+ parser .add_argument ("-p" , "--port" , type = int , default = 2003 , help = "port to send message to, default %(default)d" )
261+ parser .add_argument ("-u" , "--udp" , action = "store_true" , help = "send via UDP instead of TCP" )
262+ parser .add_argument ("-t" , "--timestamp" , type = int , help = "Unix timestamp for message (defaults to current time)" )
263+ parser .add_argument ("-q" , "--quiet" , action = "store_true" , help = "quiet mode (don't log send to stdout)" )
259264 args = parser .parse_args ()
260265
261266 if not args .quiet :
262- logging .basicConfig (level = logging .INFO , format = ' %(message)s' )
267+ logging .basicConfig (level = logging .INFO , format = " %(message)s" )
263268
264- sender = Sender (args .server , port = args .port , log_sends = not args .quiet ,
265- protocol = 'udp' if args .udp else 'tcp' )
269+ sender = Sender (args .server , port = args .port , log_sends = not args .quiet , protocol = "udp" if args .udp else "tcp" )
266270 sender .send (args .metric , args .value , timestamp = args .timestamp )
0 commit comments