Skip to content

Auto logging

AutoLoggerConfig

A Class for the Automated Logging Config

Source code in src/super_gradients/common/auto_logging/auto_logger.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
class AutoLoggerConfig:
    """
    A Class for the Automated Logging Config
    """

    filename: Union[str, None]

    def __init__(self):
        self.filename = None

    def _setup_default_logging(self, log_level: str = None) -> None:
        """
        Setup default logging configuration. Usually happens when app starts, and we don't have
        experiment dir yet.
        The default log directory will be `~/sg_logs`
        :param log_level: The default log level to use. If None, uses LOG_LEVEL and CONSOLE_LOG_LEVEL environment vars.
        :return: None
        """

        # There is no _easy_ way to log all events to a single file, when using DDP or DataLoader with num_workers > 1
        # on Windows platform. In both these cases a multiple processes will be spawned and multiple logs may be created.
        # Therefore the log file will have the parent PID to being able to discriminate the logs corresponding to a single run.
        timestamp = time.strftime("%Y_%m_%d_%H_%M_%S", time.localtime())
        self._setup_logging(
            filename=os.path.join(env_variables.SUPER_GRADIENTS_LOG_DIR, f"logs_{os.getppid()}_{timestamp}.log"),
            copy_already_logged_messages=False,
            filemode="w",
            log_level=log_level,
        )

    def _setup_logging(self, filename: str, copy_already_logged_messages: bool, filemode: str = "a", log_level: str = None) -> None:
        """
        Sets the logging configuration to store messages to specific file
        :param filename: Output log file
        :param filemode: Open mode for file
        :param copy_already_logged_messages: Controls whether messages from previous log configuration should be copied
               to new place. This is helpful to transfer diagnostic messages (from the app start) to experiment dir.
        :param log_level: The default log level to use. If None, uses LOG_LEVEL and CONSOLE_LOG_LEVEL environment vars.
        :return:
        """
        os.makedirs(os.path.dirname(filename), exist_ok=True)

        if copy_already_logged_messages and self.filename is not None and os.path.exists(self.filename):
            with open(self.filename, "r", encoding="utf-8") as src:
                with open(filename, "w") as dst:
                    dst.write(src.read())

        file_logging_level = log_level or env_variables.FILE_LOG_LEVEL
        console_logging_level = log_level or env_variables.CONSOLE_LOG_LEVEL

        cur_version = sys.version_info
        python_38 = (3, 8)
        python_39 = (3, 9)
        manager = logging.getLogger("").manager

        extra_kwargs = {}
        if cur_version >= python_38:
            extra_kwargs = dict(
                force=True,
            )
        else:
            # If the logging does not support force=True, we should manually delete handlers
            for h in manager.root.handlers:
                try:
                    h.close()
                except AttributeError:
                    pass
            del manager.root.handlers[:]

        if cur_version >= python_39:
            extra_kwargs["encoding"] = "utf-8"

        logging.basicConfig(
            filename=filename,
            filemode=filemode,
            format="%(asctime)s %(levelname)s - %(name)s - %(message)s",
            datefmt="[%Y-%m-%d %H:%M:%S]",
            level=file_logging_level,
            **extra_kwargs,
        )

        # Add console handler
        console_handler = logging.StreamHandler()
        console_handler.setLevel(console_logging_level)
        console_handler.setFormatter(
            logging.Formatter(
                "%(asctime)s %(levelname)s - %(filename)s - %(message)s",
                datefmt="[%Y-%m-%d %H:%M:%S]",
            )
        )
        manager.root.handlers.append(console_handler)

        self.filename = filename

    @classmethod
    def get_instance(cls):
        global _super_gradients_logger_config
        if _super_gradients_logger_config is None:
            _super_gradients_logger_config = cls()
            _super_gradients_logger_config._setup_default_logging()

        return _super_gradients_logger_config

    @classmethod
    def get_log_file_path(cls) -> str:
        """
        Return the current log file used to store log messages
        :return: Full path to log file
        """
        self = cls.get_instance()
        return self.filename

    @classmethod
    def setup_logging(cls, filename: str, copy_already_logged_messages: bool, filemode: str = "a", log_level: str = None) -> None:
        self = cls.get_instance()
        self._setup_logging(filename, copy_already_logged_messages, filemode, log_level)

get_log_file_path() classmethod

Return the current log file used to store log messages

Returns:

Type Description
str

Full path to log file

Source code in src/super_gradients/common/auto_logging/auto_logger.py
114
115
116
117
118
119
120
121
@classmethod
def get_log_file_path(cls) -> str:
    """
    Return the current log file used to store log messages
    :return: Full path to log file
    """
    self = cls.get_instance()
    return self.filename

BufferWriter

File writer buffer that opens a file only when flushing and under the condition that threshold buffersize was reached.

Source code in src/super_gradients/common/auto_logging/console_logging.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
class BufferWriter:
    """File writer buffer that opens a file only when flushing and under the condition that threshold buffersize was reached."""

    FILE_BUFFER_SIZE = 10_000  # Number of chars to be buffered before writing the buffer on disk.

    def __init__(self, filename: str, buffer: StringIO, buffer_size: int, lock: Lock):
        """
        :param filename:         Name of the file where to write the bugger
        :param buffer:           Buffer object
        :param buffer_size:      Number of chars to be buffered before writing the buffer on disk.
        :param lock:             Thread lock to prevent multiple threads to write at the same time
        """
        self.buffer = buffer
        self.filename = filename
        self.buffer_size = buffer_size
        self.lock = lock

    def write(self, data: str):
        """Write to buffer (not on disk)."""
        with self.lock:
            self.buffer.write(data)
        if self._require_flush():
            self.flush()

    def flush(self, force: bool = False):
        """Write the buffer on disk if relevant."""
        if force or self._require_flush():
            with self.lock:
                os.makedirs(os.path.dirname(self.filename), exist_ok=True)
                with open(self.filename, "a", encoding="utf-8") as f:
                    f.write(self.buffer.getvalue())
                    self.buffer.truncate(0)
                    self.buffer.seek(0)

    def _require_flush(self) -> bool:
        """Indicate if a buffer is needed (i.e. if buffer size above threshold)"""
        return len(self.buffer.getvalue()) > self.buffer_size

__init__(filename, buffer, buffer_size, lock)

Parameters:

Name Type Description Default
filename str

Name of the file where to write the bugger

required
buffer StringIO

Buffer object

required
buffer_size int

Number of chars to be buffered before writing the buffer on disk.

required
lock Lock

Thread lock to prevent multiple threads to write at the same time

required
Source code in src/super_gradients/common/auto_logging/console_logging.py
18
19
20
21
22
23
24
25
26
27
28
def __init__(self, filename: str, buffer: StringIO, buffer_size: int, lock: Lock):
    """
    :param filename:         Name of the file where to write the bugger
    :param buffer:           Buffer object
    :param buffer_size:      Number of chars to be buffered before writing the buffer on disk.
    :param lock:             Thread lock to prevent multiple threads to write at the same time
    """
    self.buffer = buffer
    self.filename = filename
    self.buffer_size = buffer_size
    self.lock = lock

flush(force=False)

Write the buffer on disk if relevant.

Source code in src/super_gradients/common/auto_logging/console_logging.py
37
38
39
40
41
42
43
44
45
def flush(self, force: bool = False):
    """Write the buffer on disk if relevant."""
    if force or self._require_flush():
        with self.lock:
            os.makedirs(os.path.dirname(self.filename), exist_ok=True)
            with open(self.filename, "a", encoding="utf-8") as f:
                f.write(self.buffer.getvalue())
                self.buffer.truncate(0)
                self.buffer.seek(0)

write(data)

Write to buffer (not on disk).

Source code in src/super_gradients/common/auto_logging/console_logging.py
30
31
32
33
34
35
def write(self, data: str):
    """Write to buffer (not on disk)."""
    with self.lock:
        self.buffer.write(data)
    if self._require_flush():
        self.flush()

ConsoleSink

Singleton responsible to sink the console streams (stdout/stderr) into a file.

Source code in src/super_gradients/common/auto_logging/console_logging.py
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
class ConsoleSink:
    """Singleton responsible to sink the console streams (stdout/stderr) into a file."""

    def __init__(self):
        self._setup()
        atexit.register(self._flush)  # Flush at the end of the process

    @multi_process_safe
    def _setup(self):
        """On instantiation, setup the default sink file."""
        filename = Path(env_variables.SUPER_GRADIENTS_LOG_DIR) / "console.log"
        filename.parent.mkdir(exist_ok=True)
        self.filename = str(filename)
        os.makedirs(os.path.dirname(self.filename), exist_ok=True)

        buffer = StringIO()
        lock = Lock()
        self.stdout = StdoutTee(filename=self.filename, buffer=buffer, buffer_size=BufferWriter.FILE_BUFFER_SIZE, lock=lock)
        self.stderr = StderrTee(filename=self.filename, buffer=buffer, buffer_size=BufferWriter.FILE_BUFFER_SIZE, lock=lock)

        # We don't want to rewrite this for subprocesses when using DDP.
        if is_main_process():
            with open(self.filename, mode="w", encoding="utf-8") as f:
                f.write("============================================================\n")
                f.write(f'New run started at {datetime.now().strftime("%Y-%m-%d.%H:%M:%S.%f")}\n')
                f.write(f'sys.argv: "{" ".join(sys.argv)}"\n')
                f.write("============================================================\n")
        self.stdout.write(f"The console stream is logged into {self.filename}\n")

    @multi_process_safe
    def _set_location(self, filename: str):
        """Copy and redirect the sink file into another location."""
        self._flush()

        prev_filename = self.filename
        copy_file(src_filename=prev_filename, dest_filename=filename, copy_mode="a")

        self.filename = filename
        self.stdout.filename = filename
        self.stderr.filename = filename
        self.stdout.write(f"The console stream is now moved to {filename}\n")

    @staticmethod
    def set_location(filename: str) -> None:
        """Copy and redirect the sink file into another location."""
        _console_sink._set_location(filename)

    @multi_process_safe
    def _flush(self):
        """Force the flush on stdout and stderr."""
        self.stdout.flush(force=True)
        self.stderr.flush(force=True)

    @staticmethod
    def flush():
        """Force the flush on stdout and stderr."""
        _console_sink._flush()

    @staticmethod
    def get_filename():
        """Get the filename of the sink."""
        return _console_sink.filename

flush() staticmethod

Force the flush on stdout and stderr.

Source code in src/super_gradients/common/auto_logging/console_logging.py
164
165
166
167
@staticmethod
def flush():
    """Force the flush on stdout and stderr."""
    _console_sink._flush()

get_filename() staticmethod

Get the filename of the sink.

Source code in src/super_gradients/common/auto_logging/console_logging.py
169
170
171
172
@staticmethod
def get_filename():
    """Get the filename of the sink."""
    return _console_sink.filename

set_location(filename) staticmethod

Copy and redirect the sink file into another location.

Source code in src/super_gradients/common/auto_logging/console_logging.py
153
154
155
156
@staticmethod
def set_location(filename: str) -> None:
    """Copy and redirect the sink file into another location."""
    _console_sink._set_location(filename)

StderrTee

Bases: BufferWriter

Duplicate the stderr stream to save it into a given file.

Source code in src/super_gradients/common/auto_logging/console_logging.py
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
class StderrTee(BufferWriter):
    """Duplicate the stderr stream to save it into a given file."""

    def __init__(self, filename: str, buffer: StringIO, buffer_size: int, lock: Lock):
        """
        :param filename:         Name of the file where to write the bugger
        :param buffer:           Buffer object
        :param buffer_size:      Number of chars to be buffered before writing the buffer on disk.
        :param lock:             Thread lock to prevent multiple threads to write at the same time
        """
        super().__init__(filename, buffer, buffer_size, lock)
        self.stderr = sys.stderr
        sys.stderr = self

    def __del__(self):
        sys.stderr = self.stderr

    def write(self, data):
        super().write(data)
        self.stderr.write(data)

    def __getattr__(self, attr):
        return getattr(self.stderr, attr)

__init__(filename, buffer, buffer_size, lock)

Parameters:

Name Type Description Default
filename str

Name of the file where to write the bugger

required
buffer StringIO

Buffer object

required
buffer_size int

Number of chars to be buffered before writing the buffer on disk.

required
lock Lock

Thread lock to prevent multiple threads to write at the same time

required
Source code in src/super_gradients/common/auto_logging/console_logging.py
55
56
57
58
59
60
61
62
63
64
def __init__(self, filename: str, buffer: StringIO, buffer_size: int, lock: Lock):
    """
    :param filename:         Name of the file where to write the bugger
    :param buffer:           Buffer object
    :param buffer_size:      Number of chars to be buffered before writing the buffer on disk.
    :param lock:             Thread lock to prevent multiple threads to write at the same time
    """
    super().__init__(filename, buffer, buffer_size, lock)
    self.stderr = sys.stderr
    sys.stderr = self

StdoutTee

Bases: BufferWriter

Duplicate the stdout stream to save it into a given file.

Source code in src/super_gradients/common/auto_logging/console_logging.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class StdoutTee(BufferWriter):
    """Duplicate the stdout stream to save it into a given file."""

    def __init__(self, filename: str, buffer, buffer_size: int, lock: Lock):
        """
        :param filename:         Name of the file where to write the bugger
        :param buffer:           Buffer object
        :param buffer_size:      Number of chars to be buffered before writing the buffer on disk.
        :param lock:             Thread lock to prevent multiple threads to write at the same time
        """
        super().__init__(filename, buffer, buffer_size, lock)
        self.stdout = sys.stdout
        sys.stdout = self

    def __del__(self):
        sys.stdout = self.stdout

    def write(self, data):
        super().write(data)
        self.stdout.write(data)

    def __getattr__(self, attr):
        return getattr(self.stdout, attr)

__init__(filename, buffer, buffer_size, lock)

Parameters:

Name Type Description Default
filename str

Name of the file where to write the bugger

required
buffer

Buffer object

required
buffer_size int

Number of chars to be buffered before writing the buffer on disk.

required
lock Lock

Thread lock to prevent multiple threads to write at the same time

required
Source code in src/super_gradients/common/auto_logging/console_logging.py
80
81
82
83
84
85
86
87
88
89
def __init__(self, filename: str, buffer, buffer_size: int, lock: Lock):
    """
    :param filename:         Name of the file where to write the bugger
    :param buffer:           Buffer object
    :param buffer_size:      Number of chars to be buffered before writing the buffer on disk.
    :param lock:             Thread lock to prevent multiple threads to write at the same time
    """
    super().__init__(filename, buffer, buffer_size, lock)
    self.stdout = sys.stdout
    sys.stdout = self

copy_file(src_filename, dest_filename, copy_mode='w')

Copy a file from source to destination. Also works when the destination folder does not exist.

Source code in src/super_gradients/common/auto_logging/console_logging.py
102
103
104
105
106
107
108
def copy_file(src_filename: str, dest_filename: str, copy_mode: str = "w"):
    """Copy a file from source to destination. Also works when the destination folder does not exist."""
    os.makedirs(os.path.dirname(dest_filename), exist_ok=True)
    if os.path.exists(src_filename):
        with open(src_filename, "r", encoding="utf-8") as src:
            with open(dest_filename, copy_mode, encoding="utf-8") as dst:
                dst.write(src.read())