Coverage for app/core/events.py: 100%

25 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-16 20:06 +0000

1from enum import Enum 

2 

3from app.core.logging import Logger 

4from app.core.metrics import ( 

5 probe_created_total, 

6 probe_invalid_setups_total, 

7 probe_commands_total, 

8 probe_invalid_commands_total, 

9) 

10 

11 

12class ProbeEvents(Enum): 

13 PROBE_CREATED = "probe_created" 

14 PROBE_COMMAND_SENT = "probe_command_sent" 

15 PROBE_INVALID_SETUP = "probe_invalid_setup" 

16 PROBE_INVALID_COMMAND = "probe_invalid_command" 

17 

18 

19class ProbeMetrics: 

20 @staticmethod 

21 def inc_probe_created(): 

22 probe_created_total.inc() 

23 Logger.log( 

24 "probe_created_total", 

25 operation="increment", 

26 new_value=probe_created_total._value.get(), 

27 ) 

28 

29 @staticmethod 

30 def inc_probe_command_sent(): 

31 probe_commands_total.inc() 

32 Logger.log( 

33 "probe_commands_total", 

34 operation="increment", 

35 new_value=probe_commands_total._value.get(), 

36 ) 

37 

38 @staticmethod 

39 def inc_probe_invalid_setup(): 

40 probe_invalid_setups_total.inc() 

41 Logger.log( 

42 "probe_invalid_setups_total", 

43 operation="increment", 

44 new_value=probe_invalid_setups_total._value.get(), 

45 ) 

46 

47 @staticmethod 

48 def inc_probe_invalid_command(): 

49 probe_invalid_commands_total.inc() 

50 Logger.log( 

51 "probe_invalid_commands_total", 

52 operation="increment", 

53 new_value=probe_invalid_commands_total._value.get(), 

54 )