Skip to content

Sg loggers

AbstractSGLogger

Bases: ABC

A SGLogger handles all outputs of the training process. Every generated file, log, metrics value, image or other artifacts produced by the trainer will be processed and saved.

Inheriting SGLogger can be used in order to integrate experiment management framework, special storage setting, a specific logging library etc.

Important: The BaseSGLogger class (inheriting from SGLogger) is used by the trainer by default. When defining your own SGLogger you will override all default output functionality. No files will saved to disk and no data will be collected. Make sure you either implement this functionality or use SGLoggers.Compose([BaseSGLogger(...), YourSGLogger(...)]) to build on top of it.

Source code in src/super_gradients/common/sg_loggers/abstract_sg_logger.py
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
class AbstractSGLogger(ABC):
    """
    A SGLogger handles all outputs of the training process.
    Every generated file, log, metrics value, image or other artifacts produced by the trainer will be processed and saved.

    Inheriting SGLogger can be used in order to integrate experiment management framework, special storage setting, a specific logging library etc.

    Important: The BaseSGLogger class (inheriting from SGLogger) is used by the trainer by default. When defining your own SGLogger you will
    override all default output functionality. No files will saved to disk and no data will be collected.
    Make sure you either implement this functionality or use SGLoggers.Compose([BaseSGLogger(...), YourSGLogger(...)]) to build on top of it.
    """

    @abstractmethod
    def add(self, tag: str, obj: Any, global_step: int = None):
        """
        A generic function for adding any type of data to the SGLogger. By default, this function is not called by the Trainer, BaseSGLogger
        does nothing with this type of data. But if you need to pass a data type which is not supported by any of the following abstract methods, use this
        method.
        """
        raise NotImplementedError

    @abstractmethod
    def add_config(self, tag: str, config: dict):
        """
        Add the configuration (settings and hyperparameters) to the SGLoggers.
        Typically, this function will add the configuration dictionary to logs,
        write it to tensorboard, send it to an experiment management framework ect.

        :param tag: Data identifier
        :param config: a dictionary of the experiment config
        """
        raise NotImplementedError

    @abstractmethod
    def add_scalar(self, tag: str, scalar_value: float, global_step: Union[int, TimeUnit] = None):
        """
        Add scalar data to SGLogger.
        Typically, this function will add scalar to tensorboard or other experiment management framework.

        :param tag: Data identifier
        :param scalar_value: Value to save
        :param global_step: Global step value to record
        """
        raise NotImplementedError

    @abstractmethod
    def add_scalars(self, tag_scalar_dict: dict, global_step: int = None):
        """
        Adds multiple scalar data to SGLogger.
        Typically, this function will add scalars to tensorboard or other experiment management framework.

        :param tag_scalar_dict: a dictionary {tag(str): value(float)} of the scalars.
        :param global_step: Global step value to record
        """
        raise NotImplementedError

    @abstractmethod
    def add_image(self, tag: str, image: Union[torch.Tensor, np.array, Image.Image], data_format: str = "CHW", global_step: int = None):
        """
        Add a single image to SGLogger.
        Typically, this function will add an image to tensorboard, save it to disk or add it to experiment management framework.

        :param tag: Data identifier
        :param image: an image to be added. The values should lie in [0, 255] for type uint8 or [0, 1] for type float.
        :param data_format: Image data format specification of the form CHW, HWC, HW, WH, etc.
        :param global_step: Global step value to record
        """
        raise NotImplementedError

    @abstractmethod
    def add_images(self, tag: str, images: Union[torch.Tensor, np.array], data_format="NCHW", global_step: int = None):
        """
        Add multiple images to SGLogger.
        Typically, this function will add images to tensorboard, save them to disk or add them to experiment management framework.

        :param tag: Data identifier
        :param images: images to be added. The values should lie in [0, 255] for type uint8 or [0, 1] for type float.
        :param data_format: Image data format specification of the form NCHW, NHWC, NHW, NWH, etc.
        :param global_step: Global step value to record
        """
        raise NotImplementedError

    @abstractmethod
    def add_histogram(self, tag: str, values: Union[torch.Tensor, np.array], bins: Union[str, np.array, list, int] = "auto", global_step: int = None):
        """
        Add a histogram to SGLogger.
        Typically, this function will add a histogram to tensorboard or add it to experiment management framework.

        :param tag: Data identifier
        :param values: Values to build histogram
        :param bins: This determines how the bins are made.
            If bins is an int, it defines the number of equal-width bins in the given range
            If bins is a sequence, it defines a monotonically increasing array of bin edges, including the rightmost edge, allowing for non-uniform bin widths.
            If bins is a string, it defines the method used to calculate the optimal bin width, as defined by
            https://numpy.org/doc/stable/reference/generated/numpy.histogram_bin_edges.html#numpy.histogram_bin_edges
            one of [‘sqrt’, ’auto’, ‘fd’, ‘doane’, ‘scott’, ‘stone’...]
        :param global_step: Global step value to record
        """
        raise NotImplementedError

    @abstractmethod
    def add_text(self, tag: str, text_string: str, global_step: int = None):
        """
        Add a text to SGLogger.
        Typically, this function will add a text to tensorboard or add it to experiment management framework.

        :param tag: Data identifier
        :param text_string: the text to be added
        :param global_step: Global step value to record
        """
        raise NotImplementedError

    @abstractmethod
    def add_checkpoint(self, tag: str, state_dict: dict, global_step: int = None):
        """
        Add a checkpoint to SGLogger
        Typically, this function will write a torch file to disk, upload it to remote storage or to experiment management framework.

        :param tag: Data identifier
        :param state_dict: the state dict to save. The state dict includes more than just the model weight and may include any of:
                net: model weights
                acc: current accuracy (depends on metrics)
                epoch: current epoch
                optimizer_state_dict: optimizer state
                scaler_state_dict: torch.amp.scaler sate
        :param global_step: Global step value to record
        """
        raise NotImplementedError

    @abstractmethod
    def add_file(self, file_name: str = None):
        """
        Add a file from the checkpoint directory to the logger (usually, upload the file or adds it to an artifact)
        """
        raise NotImplementedError

    @abstractmethod
    def upload(self):
        """
        Upload any files which should be stored on remote storage
        """
        raise NotImplementedError

    @abstractmethod
    def flush(self):
        """
        Flush the SGLogger's cache
        """
        raise NotImplementedError

    @abstractmethod
    def close(self):
        """
        Close the SGLogger
        """
        raise NotImplementedError

    @abstractmethod
    def local_dir(self) -> str:
        """
        A getter for the full/absolute path where all files are saved locally
        :return:
        """
        raise NotImplementedError

    def download_remote_ckpt(self, ckpt_name: str, *args, **kwargs):

        raise NotImplementedError

add(tag, obj, global_step=None) abstractmethod

A generic function for adding any type of data to the SGLogger. By default, this function is not called by the Trainer, BaseSGLogger does nothing with this type of data. But if you need to pass a data type which is not supported by any of the following abstract methods, use this method.

Source code in src/super_gradients/common/sg_loggers/abstract_sg_logger.py
23
24
25
26
27
28
29
30
@abstractmethod
def add(self, tag: str, obj: Any, global_step: int = None):
    """
    A generic function for adding any type of data to the SGLogger. By default, this function is not called by the Trainer, BaseSGLogger
    does nothing with this type of data. But if you need to pass a data type which is not supported by any of the following abstract methods, use this
    method.
    """
    raise NotImplementedError

add_checkpoint(tag, state_dict, global_step=None) abstractmethod

Add a checkpoint to SGLogger Typically, this function will write a torch file to disk, upload it to remote storage or to experiment management framework.

Parameters:

Name Type Description Default
tag str

Data identifier

required
state_dict dict

the state dict to save. The state dict includes more than just the model weight and may include any of: net: model weights acc: current accuracy (depends on metrics) epoch: current epoch optimizer_state_dict: optimizer state scaler_state_dict: torch.amp.scaler sate

required
global_step int

Global step value to record

None
Source code in src/super_gradients/common/sg_loggers/abstract_sg_logger.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
@abstractmethod
def add_checkpoint(self, tag: str, state_dict: dict, global_step: int = None):
    """
    Add a checkpoint to SGLogger
    Typically, this function will write a torch file to disk, upload it to remote storage or to experiment management framework.

    :param tag: Data identifier
    :param state_dict: the state dict to save. The state dict includes more than just the model weight and may include any of:
            net: model weights
            acc: current accuracy (depends on metrics)
            epoch: current epoch
            optimizer_state_dict: optimizer state
            scaler_state_dict: torch.amp.scaler sate
    :param global_step: Global step value to record
    """
    raise NotImplementedError

add_config(tag, config) abstractmethod

Add the configuration (settings and hyperparameters) to the SGLoggers. Typically, this function will add the configuration dictionary to logs, write it to tensorboard, send it to an experiment management framework ect.

Parameters:

Name Type Description Default
tag str

Data identifier

required
config dict

a dictionary of the experiment config

required
Source code in src/super_gradients/common/sg_loggers/abstract_sg_logger.py
32
33
34
35
36
37
38
39
40
41
42
@abstractmethod
def add_config(self, tag: str, config: dict):
    """
    Add the configuration (settings and hyperparameters) to the SGLoggers.
    Typically, this function will add the configuration dictionary to logs,
    write it to tensorboard, send it to an experiment management framework ect.

    :param tag: Data identifier
    :param config: a dictionary of the experiment config
    """
    raise NotImplementedError

add_file(file_name=None) abstractmethod

Add a file from the checkpoint directory to the logger (usually, upload the file or adds it to an artifact)

Source code in src/super_gradients/common/sg_loggers/abstract_sg_logger.py
140
141
142
143
144
145
@abstractmethod
def add_file(self, file_name: str = None):
    """
    Add a file from the checkpoint directory to the logger (usually, upload the file or adds it to an artifact)
    """
    raise NotImplementedError

add_histogram(tag, values, bins='auto', global_step=None) abstractmethod

Add a histogram to SGLogger. Typically, this function will add a histogram to tensorboard or add it to experiment management framework.

Parameters:

Name Type Description Default
tag str

Data identifier

required
values Union[torch.Tensor, np.array]

Values to build histogram

required
bins Union[str, np.array, list, int]

This determines how the bins are made. If bins is an int, it defines the number of equal-width bins in the given range If bins is a sequence, it defines a monotonically increasing array of bin edges, including the rightmost edge, allowing for non-uniform bin widths. If bins is a string, it defines the method used to calculate the optimal bin width, as defined by https://numpy.org/doc/stable/reference/generated/numpy.histogram_bin_edges.html#numpy.histogram_bin_edges one of [‘sqrt’, ’auto’, ‘fd’, ‘doane’, ‘scott’, ‘stone’...]

'auto'
global_step int

Global step value to record

None
Source code in src/super_gradients/common/sg_loggers/abstract_sg_logger.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
@abstractmethod
def add_histogram(self, tag: str, values: Union[torch.Tensor, np.array], bins: Union[str, np.array, list, int] = "auto", global_step: int = None):
    """
    Add a histogram to SGLogger.
    Typically, this function will add a histogram to tensorboard or add it to experiment management framework.

    :param tag: Data identifier
    :param values: Values to build histogram
    :param bins: This determines how the bins are made.
        If bins is an int, it defines the number of equal-width bins in the given range
        If bins is a sequence, it defines a monotonically increasing array of bin edges, including the rightmost edge, allowing for non-uniform bin widths.
        If bins is a string, it defines the method used to calculate the optimal bin width, as defined by
        https://numpy.org/doc/stable/reference/generated/numpy.histogram_bin_edges.html#numpy.histogram_bin_edges
        one of [‘sqrt’, ’auto’, ‘fd’, ‘doane’, ‘scott’, ‘stone’...]
    :param global_step: Global step value to record
    """
    raise NotImplementedError

add_image(tag, image, data_format='CHW', global_step=None) abstractmethod

Add a single image to SGLogger. Typically, this function will add an image to tensorboard, save it to disk or add it to experiment management framework.

Parameters:

Name Type Description Default
tag str

Data identifier

required
image Union[torch.Tensor, np.array, Image.Image]

an image to be added. The values should lie in [0, 255] for type uint8 or [0, 1] for type float.

required
data_format str

Image data format specification of the form CHW, HWC, HW, WH, etc.

'CHW'
global_step int

Global step value to record

None
Source code in src/super_gradients/common/sg_loggers/abstract_sg_logger.py
67
68
69
70
71
72
73
74
75
76
77
78
@abstractmethod
def add_image(self, tag: str, image: Union[torch.Tensor, np.array, Image.Image], data_format: str = "CHW", global_step: int = None):
    """
    Add a single image to SGLogger.
    Typically, this function will add an image to tensorboard, save it to disk or add it to experiment management framework.

    :param tag: Data identifier
    :param image: an image to be added. The values should lie in [0, 255] for type uint8 or [0, 1] for type float.
    :param data_format: Image data format specification of the form CHW, HWC, HW, WH, etc.
    :param global_step: Global step value to record
    """
    raise NotImplementedError

add_images(tag, images, data_format='NCHW', global_step=None) abstractmethod

Add multiple images to SGLogger. Typically, this function will add images to tensorboard, save them to disk or add them to experiment management framework.

Parameters:

Name Type Description Default
tag str

Data identifier

required
images Union[torch.Tensor, np.array]

images to be added. The values should lie in [0, 255] for type uint8 or [0, 1] for type float.

required
data_format

Image data format specification of the form NCHW, NHWC, NHW, NWH, etc.

'NCHW'
global_step int

Global step value to record

None
Source code in src/super_gradients/common/sg_loggers/abstract_sg_logger.py
80
81
82
83
84
85
86
87
88
89
90
91
@abstractmethod
def add_images(self, tag: str, images: Union[torch.Tensor, np.array], data_format="NCHW", global_step: int = None):
    """
    Add multiple images to SGLogger.
    Typically, this function will add images to tensorboard, save them to disk or add them to experiment management framework.

    :param tag: Data identifier
    :param images: images to be added. The values should lie in [0, 255] for type uint8 or [0, 1] for type float.
    :param data_format: Image data format specification of the form NCHW, NHWC, NHW, NWH, etc.
    :param global_step: Global step value to record
    """
    raise NotImplementedError

add_scalar(tag, scalar_value, global_step=None) abstractmethod

Add scalar data to SGLogger. Typically, this function will add scalar to tensorboard or other experiment management framework.

Parameters:

Name Type Description Default
tag str

Data identifier

required
scalar_value float

Value to save

required
global_step Union[int, TimeUnit]

Global step value to record

None
Source code in src/super_gradients/common/sg_loggers/abstract_sg_logger.py
44
45
46
47
48
49
50
51
52
53
54
@abstractmethod
def add_scalar(self, tag: str, scalar_value: float, global_step: Union[int, TimeUnit] = None):
    """
    Add scalar data to SGLogger.
    Typically, this function will add scalar to tensorboard or other experiment management framework.

    :param tag: Data identifier
    :param scalar_value: Value to save
    :param global_step: Global step value to record
    """
    raise NotImplementedError

add_scalars(tag_scalar_dict, global_step=None) abstractmethod

Adds multiple scalar data to SGLogger. Typically, this function will add scalars to tensorboard or other experiment management framework.

Parameters:

Name Type Description Default
tag_scalar_dict dict

a dictionary {tag(str): value(float)} of the scalars.

required
global_step int

Global step value to record

None
Source code in src/super_gradients/common/sg_loggers/abstract_sg_logger.py
56
57
58
59
60
61
62
63
64
65
@abstractmethod
def add_scalars(self, tag_scalar_dict: dict, global_step: int = None):
    """
    Adds multiple scalar data to SGLogger.
    Typically, this function will add scalars to tensorboard or other experiment management framework.

    :param tag_scalar_dict: a dictionary {tag(str): value(float)} of the scalars.
    :param global_step: Global step value to record
    """
    raise NotImplementedError

add_text(tag, text_string, global_step=None) abstractmethod

Add a text to SGLogger. Typically, this function will add a text to tensorboard or add it to experiment management framework.

Parameters:

Name Type Description Default
tag str

Data identifier

required
text_string str

the text to be added

required
global_step int

Global step value to record

None
Source code in src/super_gradients/common/sg_loggers/abstract_sg_logger.py
111
112
113
114
115
116
117
118
119
120
121
@abstractmethod
def add_text(self, tag: str, text_string: str, global_step: int = None):
    """
    Add a text to SGLogger.
    Typically, this function will add a text to tensorboard or add it to experiment management framework.

    :param tag: Data identifier
    :param text_string: the text to be added
    :param global_step: Global step value to record
    """
    raise NotImplementedError

close() abstractmethod

Close the SGLogger

Source code in src/super_gradients/common/sg_loggers/abstract_sg_logger.py
161
162
163
164
165
166
@abstractmethod
def close(self):
    """
    Close the SGLogger
    """
    raise NotImplementedError

flush() abstractmethod

Flush the SGLogger's cache

Source code in src/super_gradients/common/sg_loggers/abstract_sg_logger.py
154
155
156
157
158
159
@abstractmethod
def flush(self):
    """
    Flush the SGLogger's cache
    """
    raise NotImplementedError

local_dir() abstractmethod

A getter for the full/absolute path where all files are saved locally

Returns:

Type Description
str
Source code in src/super_gradients/common/sg_loggers/abstract_sg_logger.py
168
169
170
171
172
173
174
@abstractmethod
def local_dir(self) -> str:
    """
    A getter for the full/absolute path where all files are saved locally
    :return:
    """
    raise NotImplementedError

upload() abstractmethod

Upload any files which should be stored on remote storage

Source code in src/super_gradients/common/sg_loggers/abstract_sg_logger.py
147
148
149
150
151
152
@abstractmethod
def upload(self):
    """
    Upload any files which should be stored on remote storage
    """
    raise NotImplementedError

BaseSGLogger

Bases: AbstractSGLogger

Source code in src/super_gradients/common/sg_loggers/base_sg_logger.py
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
@register_sg_logger("base_sg_logger")
class BaseSGLogger(AbstractSGLogger):
    def __init__(
        self,
        project_name: str,
        experiment_name: str,
        storage_location: str,
        resumed: bool,
        training_params: TrainingParams,
        checkpoints_dir_path: str,
        tb_files_user_prompt: bool = False,
        launch_tensorboard: bool = False,
        tensorboard_port: int = None,
        save_checkpoints_remote: bool = True,
        save_tensorboard_remote: bool = True,
        save_logs_remote: bool = True,
        monitor_system: bool = True,
    ):
        """

        :param experiment_name:         Name used for logging and loading purposes
        :param storage_location:        If set to 's3' (i.e. s3://my-bucket) saves the Checkpoints in AWS S3 otherwise saves the Checkpoints Locally
        :param resumed:                 If true, then old tensorboard files will **NOT** be deleted when tb_files_user_prompt=True
        :param training_params:         training_params for the experiment.
        :param checkpoints_dir_path:    Local root directory path where all experiment logging directories will reside.
        :param tb_files_user_prompt:    Asks user for Tensorboard deletion prompt.
        :param launch_tensorboard:      Whether to launch a TensorBoard process.
        :param tensorboard_port:        Specific port number for the tensorboard to use when launched (when set to None, some free port number will be used
        :param save_checkpoints_remote: Saves checkpoints in s3.
        :param save_tensorboard_remote: Saves tensorboard in s3.
        :param save_logs_remote:        Saves log files in s3.
        :param monitor_system:          Save the system statistics (GPU utilization, CPU, ...) in the tensorboard
        """
        super().__init__()
        self.project_name = project_name
        self.experiment_name = experiment_name
        self.storage_location = storage_location

        if storage_location.startswith("s3"):
            self.save_checkpoints_remote = save_checkpoints_remote
            self.save_tensorboard_remote = save_tensorboard_remote
            self.save_logs_remote = save_logs_remote
            self.remote_storage_available = True
        else:
            self.remote_storage_available = False
            if save_checkpoints_remote:
                logger.error("save_checkpoints_remote == True but storage_location is not s3 path. Files will not be saved remotely")
            if save_tensorboard_remote:
                logger.error("save_tensorboard_remote == True but storage_location is not s3 path. Files will not be saved remotely")
            if save_logs_remote:
                logger.error("save_logs_remote == True but storage_location is not s3 path. Files will not be saved remotely")

            self.save_checkpoints_remote = False
            self.save_tensorboard_remote = False
            self.save_logs_remote = False

        self.tensor_board_process = None
        self.max_global_steps = training_params.max_epochs
        self._local_dir = checkpoints_dir_path

        self._setup_dir()
        self._init_tensorboard(resumed, tb_files_user_prompt)
        self._init_log_file()

        self.model_checkpoints_data_interface = ADNNModelRepositoryDataInterfaces(data_connection_location=self.storage_location)

        if launch_tensorboard:
            self._launch_tensorboard(port=tensorboard_port)

        self._init_system_monitor(monitor_system)

        self._save_code()
        self._resume_from_remote_sg_logger = get_param(training_params, "resume_from_remote_sg_logger")

    @multi_process_safe
    def _launch_tensorboard(self, port):
        self.tensor_board_process, _ = sg_trainer_utils.launch_tensorboard_process(self._local_dir, port=port)

    @multi_process_safe
    def _init_tensorboard(self, resumed, tb_files_user_prompt):
        self.tensorboard_writer = sg_trainer_utils.init_summary_writer(self._local_dir, resumed, tb_files_user_prompt)

    @multi_process_safe
    def _init_system_monitor(self, monitor_system: bool):
        if monitor_system:
            self.system_monitor = SystemMonitor.start(tensorboard_writer=self.tensorboard_writer)
        else:
            self.system_monitor = None

    @multi_process_safe
    def _setup_dir(self):
        if not os.path.isdir(self._local_dir):
            os.makedirs(self._local_dir)

        # If we are not logging in the root of the experiment directory, and instead we do in a run directory,
        # we need to ensure that we copy the `.hydra` folder
        if is_run_dir(self._local_dir):
            source_hydra_path = os.path.join(os.path.dirname(self._local_dir), ".hydra")
            # Only if it exists, i.e. if hydra was used.
            if os.path.exists(source_hydra_path):
                destination_hydra_path = os.path.join(self._local_dir, ".hydra")
                if not os.path.exists(destination_hydra_path):
                    shutil.copytree(source_hydra_path, destination_hydra_path)

    @multi_process_safe
    def _init_log_file(self):
        time_string = time.strftime("%b%d_%H_%M_%S", time.localtime())

        # Where the experiment related info will be saved (config and training/validation results per epoch_
        self.experiment_log_path = f"{self._local_dir}/{EXPERIMENT_LOGS_PREFIX}_{time_string}.txt"

        # Where the logger.log will be saved
        self.logs_path = f"{self._local_dir}/{LOGGER_LOGS_PREFIX}_{time_string}.txt"

        # Where the console prints/logs will be saved
        self.console_sink_path = f"{self._local_dir}/{CONSOLE_LOGS_PREFIX}_{time_string}.txt"

        AutoLoggerConfig.setup_logging(filename=self.logs_path, copy_already_logged_messages=True)
        ConsoleSink.set_location(filename=self.console_sink_path)

    @multi_process_safe
    def _write_to_log_file(self, lines: list):
        with open(self.experiment_log_path, "a" if os.path.exists(self.experiment_log_path) else "w") as log_file:
            for line in lines:
                log_file.write(line + "\n")

    @multi_process_safe
    def add_config(self, tag: str, config: dict):
        log_lines = ["--------- config parameters ----------"]
        log_lines.append(json.dumps(config, indent=4, default=str))
        log_lines.append("------- config parameters end --------")

        self.tensorboard_writer.add_text(tag, json.dumps(config, indent=4, default=str).replace(" ", " ").replace("\n", "  \n  "))
        self._write_to_log_file(log_lines)

    @multi_process_safe
    def add_scalar(self, tag: str, scalar_value: float, global_step: Union[int, TimeUnit] = None):
        if isinstance(global_step, TimeUnit):
            global_step = global_step.get_value()
        self.tensorboard_writer.add_scalar(tag=tag.lower().replace(" ", "_"), scalar_value=scalar_value, global_step=global_step)

    @multi_process_safe
    def add_scalars(self, tag_scalar_dict: dict, global_step: int = None):
        """
        add multiple scalars.
        Unlike Tensorboard implementation, this does not add all scalars with a main tag (all scalars to the same chart).
        Instead, scalars are added to tensorboard like in add_scalar and are written in log together.
        """
        for tag, value in tag_scalar_dict.items():
            self.tensorboard_writer.add_scalar(tag=tag.lower().replace(" ", "_"), scalar_value=value, global_step=global_step)

        self.tensorboard_writer.flush()

        # WRITE THE EPOCH RESULTS TO LOG FILE
        log_line = f"\nEpoch {global_step} ({global_step+1}/{self.max_global_steps})  - "
        for tag, value in tag_scalar_dict.items():
            if isinstance(value, torch.Tensor):
                value = value.item()
            log_line += f'{tag.replace(" ", "_")}: {value}\t'

        self._write_to_log_file([log_line])

    @multi_process_safe
    def add_image(self, tag: str, image: Union[torch.Tensor, np.array, Image.Image], data_format="CHW", global_step: int = None):
        self.tensorboard_writer.add_image(tag=tag, img_tensor=image, dataformats=data_format, global_step=global_step)

    @multi_process_safe
    def add_images(self, tag: str, images: Union[torch.Tensor, np.array], data_format="NCHW", global_step: int = None):
        """
        Add multiple images to SGLogger.
        Typically, this function will add a set of images to tensorboard, save them to disk or add it to experiment management framework.

        :param tag: Data identifier
        :param images: images to be added. The values should lie in [0, 255] for type uint8 or [0, 1] for type float.
        :param data_format: Image data format specification of the form NCHW, NHWC, CHW, HWC, HW, WH, etc.
        :param global_step: Global step value to record
        """
        self.tensorboard_writer.add_images(tag=tag, img_tensor=images, dataformats=data_format, global_step=global_step)

    @multi_process_safe
    def add_video(self, tag: str, video: Union[torch.Tensor, np.array], global_step: int = None):
        """
        Add a single video to SGLogger.
        Typically, this function will add a video to tensorboard, save it to disk or add it to experiment management framework.

        :param tag: Data identifier
        :param video: the video to add. shape (N,T,C,H,W) or (T,C,H,W). The values should lie in [0, 255] for type uint8 or [0, 1] for type float.
        :param global_step: Global step value to record
        """
        if video.ndim < 5:
            video = video[
                None,
            ]
        self.tensorboard_writer.add_video(tag=tag, video=video, global_step=global_step)

    @multi_process_safe
    def add_histogram(self, tag: str, values: Union[torch.Tensor, np.array], bins: str, global_step: int = None):
        self.tensorboard_writer.add_histogram(tag=tag, values=values, bins=bins, global_step=global_step)

    @multi_process_safe
    def add_model_graph(self, tag: str, model: torch.nn.Module, dummy_input: torch.Tensor):
        """
        Add a pytorch model graph to the SGLogger.
        Only the model structure/architecture will be preserved and collected, NOT the model weights.

        :param tag: Data identifier
        :param model: the model to be added
        :param dummy_input: an input to be used for a forward call on the model
        """
        self.tensorboard_writer.add_graph(model=model, input_to_model=dummy_input)

    @multi_process_safe
    def add_text(self, tag: str, text_string: str, global_step: int = None):
        self.tensorboard_writer.add_text(tag=tag, text_string=text_string, global_step=global_step)

    @multi_process_safe
    def add_figure(self, tag: str, figure: plt.figure, global_step: int = None):
        """
        Add a text to SGLogger.
        Typically, this function will add a figure to tensorboard or add it to experiment management framework.

        :param tag: Data identifier
        :param figure: the figure to add
        :param global_step: Global step value to record
        """
        self.tensorboard_writer.add_figure(tag=tag, figure=figure, global_step=global_step)

    @multi_process_safe
    def add_file(self, file_name: str = None):
        if self.remote_storage_available:
            self.model_checkpoints_data_interface.save_remote_tensorboard_event_files(self.experiment_name, self._local_dir, file_name)

    @multi_process_safe
    def upload(self):
        """Upload the local tensorboard and log files to remote system."""
        self.flush()

        if self.save_tensorboard_remote:
            self.model_checkpoints_data_interface.save_remote_tensorboard_event_files(self.experiment_name, self._local_dir)

        if self.save_logs_remote:
            log_file_name = self.experiment_log_path.split("/")[-1]
            self.model_checkpoints_data_interface.save_remote_checkpoints_file(self.experiment_name, self._local_dir, log_file_name)

    @multi_process_safe
    def flush(self):
        self.tensorboard_writer.flush()
        ConsoleSink.flush()

    @multi_process_safe
    def close(self):
        self.upload()

        if self.system_monitor is not None:
            self.system_monitor.close()
            logger.info("[CLEANUP] - Successfully stopped system monitoring process")

        self.tensorboard_writer.close()
        if self.tensor_board_process is not None:
            try:
                logger.info("[CLEANUP] - Stopping tensorboard process")
                process = psutil.Process(self.tensor_board_process.pid)
                process.send_signal(signal.SIGTERM)
                logger.info("[CLEANUP] - Successfully stopped tensorboard process")
            except Exception as ex:
                logger.info("[CLEANUP] - Could not stop tensorboard process properly: " + str(ex))

    @multi_process_safe
    def add_checkpoint(self, tag: str, state_dict: dict, global_step: int = None) -> None:
        """Add checkpoint to experiment folder.

        :param tag:         Identifier of the checkpoint. If None, global_step will be used to name the checkpoint.
        :param state_dict:  Checkpoint state_dict.
        :param global_step: Epoch number.
        """
        name = f"ckpt_{global_step}.pth" if tag is None else tag
        if not name.endswith(".pth"):
            name += ".pth"
        path = os.path.join(self._local_dir, name)

        state_dict = self._sanitize_checkpoint(state_dict)
        self._save_checkpoint(path=path, state_dict=state_dict)

    @multi_process_safe
    def _save_checkpoint(self, path: str, state_dict: dict) -> None:
        """Save the Checkpoint locally.

        :param path:        Full path of the checkpoint
        :param state_dict:  State dict of the checkpoint
        """

        name = os.path.basename(path)
        torch.save(state_dict, path)
        if "best" in name:
            logger.info("Checkpoint saved in " + path)
        if self.save_checkpoints_remote:
            self.model_checkpoints_data_interface.save_remote_checkpoints_file(self.experiment_name, self._local_dir, name)

    def add(self, tag: str, obj: Any, global_step: int = None):
        pass

    def local_dir(self) -> str:
        return self._local_dir

    @multi_process_safe
    def _save_code(self):
        for name, code in saved_codes.items():
            if not name.endswith("py"):
                name = name + ".py"

            path = os.path.join(self._local_dir, name)
            with open(path, "w") as f:
                f.write(code)

            self.add_file(name)
            code = "\t" + code
            self.add_text(name, code.replace("\n", "  \n  \t"))  # this replacement makes tb format the code as code

    def _sanitize_checkpoint(self, state_dict: dict) -> dict:
        """
        Sanitize state dictionary to be saved in a checkpoint. Iterates recursively over the state_dict and converts
        all instances of ListConfig and DictConfig to their native python counterparts.

        :param state_dict:  Checkpoint state_dict.
        :return:            Sanitized checkpoint state_dict.
        """
        if isinstance(state_dict, (ListConfig, DictConfig)):
            state_dict = OmegaConf.to_container(state_dict, resolve=True)

        if isinstance(state_dict, torch.Tensor):
            pass
        elif isinstance(state_dict, collections.OrderedDict):
            state_dict = collections.OrderedDict((k, self._sanitize_checkpoint(v)) for k, v in state_dict.items())
        elif isinstance(state_dict, dict):
            state_dict = dict((k, self._sanitize_checkpoint(v)) for k, v in state_dict.items())
        elif isinstance(state_dict, list):
            state_dict = [self._sanitize_checkpoint(v) for v in state_dict]
        elif isinstance(state_dict, tuple):
            state_dict = tuple(self._sanitize_checkpoint(v) for v in state_dict)
        else:
            pass

        return state_dict

__init__(project_name, experiment_name, storage_location, resumed, training_params, checkpoints_dir_path, tb_files_user_prompt=False, launch_tensorboard=False, tensorboard_port=None, save_checkpoints_remote=True, save_tensorboard_remote=True, save_logs_remote=True, monitor_system=True)

Parameters:

Name Type Description Default
experiment_name str

Name used for logging and loading purposes

required
storage_location str

If set to 's3' (i.e. s3://my-bucket) saves the Checkpoints in AWS S3 otherwise saves the Checkpoints Locally

required
resumed bool

If true, then old tensorboard files will NOT be deleted when tb_files_user_prompt=True

required
training_params TrainingParams

training_params for the experiment.

required
checkpoints_dir_path str

Local root directory path where all experiment logging directories will reside.

required
tb_files_user_prompt bool

Asks user for Tensorboard deletion prompt.

False
launch_tensorboard bool

Whether to launch a TensorBoard process.

False
tensorboard_port int

Specific port number for the tensorboard to use when launched (when set to None, some free port number will be used

None
save_checkpoints_remote bool

Saves checkpoints in s3.

True
save_tensorboard_remote bool

Saves tensorboard in s3.

True
save_logs_remote bool

Saves log files in s3.

True
monitor_system bool

Save the system statistics (GPU utilization, CPU, ...) in the tensorboard

True
Source code in src/super_gradients/common/sg_loggers/base_sg_logger.py
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def __init__(
    self,
    project_name: str,
    experiment_name: str,
    storage_location: str,
    resumed: bool,
    training_params: TrainingParams,
    checkpoints_dir_path: str,
    tb_files_user_prompt: bool = False,
    launch_tensorboard: bool = False,
    tensorboard_port: int = None,
    save_checkpoints_remote: bool = True,
    save_tensorboard_remote: bool = True,
    save_logs_remote: bool = True,
    monitor_system: bool = True,
):
    """

    :param experiment_name:         Name used for logging and loading purposes
    :param storage_location:        If set to 's3' (i.e. s3://my-bucket) saves the Checkpoints in AWS S3 otherwise saves the Checkpoints Locally
    :param resumed:                 If true, then old tensorboard files will **NOT** be deleted when tb_files_user_prompt=True
    :param training_params:         training_params for the experiment.
    :param checkpoints_dir_path:    Local root directory path where all experiment logging directories will reside.
    :param tb_files_user_prompt:    Asks user for Tensorboard deletion prompt.
    :param launch_tensorboard:      Whether to launch a TensorBoard process.
    :param tensorboard_port:        Specific port number for the tensorboard to use when launched (when set to None, some free port number will be used
    :param save_checkpoints_remote: Saves checkpoints in s3.
    :param save_tensorboard_remote: Saves tensorboard in s3.
    :param save_logs_remote:        Saves log files in s3.
    :param monitor_system:          Save the system statistics (GPU utilization, CPU, ...) in the tensorboard
    """
    super().__init__()
    self.project_name = project_name
    self.experiment_name = experiment_name
    self.storage_location = storage_location

    if storage_location.startswith("s3"):
        self.save_checkpoints_remote = save_checkpoints_remote
        self.save_tensorboard_remote = save_tensorboard_remote
        self.save_logs_remote = save_logs_remote
        self.remote_storage_available = True
    else:
        self.remote_storage_available = False
        if save_checkpoints_remote:
            logger.error("save_checkpoints_remote == True but storage_location is not s3 path. Files will not be saved remotely")
        if save_tensorboard_remote:
            logger.error("save_tensorboard_remote == True but storage_location is not s3 path. Files will not be saved remotely")
        if save_logs_remote:
            logger.error("save_logs_remote == True but storage_location is not s3 path. Files will not be saved remotely")

        self.save_checkpoints_remote = False
        self.save_tensorboard_remote = False
        self.save_logs_remote = False

    self.tensor_board_process = None
    self.max_global_steps = training_params.max_epochs
    self._local_dir = checkpoints_dir_path

    self._setup_dir()
    self._init_tensorboard(resumed, tb_files_user_prompt)
    self._init_log_file()

    self.model_checkpoints_data_interface = ADNNModelRepositoryDataInterfaces(data_connection_location=self.storage_location)

    if launch_tensorboard:
        self._launch_tensorboard(port=tensorboard_port)

    self._init_system_monitor(monitor_system)

    self._save_code()
    self._resume_from_remote_sg_logger = get_param(training_params, "resume_from_remote_sg_logger")

add_checkpoint(tag, state_dict, global_step=None)

Add checkpoint to experiment folder.

Parameters:

Name Type Description Default
tag str

Identifier of the checkpoint. If None, global_step will be used to name the checkpoint.

required
state_dict dict

Checkpoint state_dict.

required
global_step int

Epoch number.

None
Source code in src/super_gradients/common/sg_loggers/base_sg_logger.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
@multi_process_safe
def add_checkpoint(self, tag: str, state_dict: dict, global_step: int = None) -> None:
    """Add checkpoint to experiment folder.

    :param tag:         Identifier of the checkpoint. If None, global_step will be used to name the checkpoint.
    :param state_dict:  Checkpoint state_dict.
    :param global_step: Epoch number.
    """
    name = f"ckpt_{global_step}.pth" if tag is None else tag
    if not name.endswith(".pth"):
        name += ".pth"
    path = os.path.join(self._local_dir, name)

    state_dict = self._sanitize_checkpoint(state_dict)
    self._save_checkpoint(path=path, state_dict=state_dict)

add_figure(tag, figure, global_step=None)

Add a text to SGLogger. Typically, this function will add a figure to tensorboard or add it to experiment management framework.

Parameters:

Name Type Description Default
tag str

Data identifier

required
figure plt.figure

the figure to add

required
global_step int

Global step value to record

None
Source code in src/super_gradients/common/sg_loggers/base_sg_logger.py
252
253
254
255
256
257
258
259
260
261
262
@multi_process_safe
def add_figure(self, tag: str, figure: plt.figure, global_step: int = None):
    """
    Add a text to SGLogger.
    Typically, this function will add a figure to tensorboard or add it to experiment management framework.

    :param tag: Data identifier
    :param figure: the figure to add
    :param global_step: Global step value to record
    """
    self.tensorboard_writer.add_figure(tag=tag, figure=figure, global_step=global_step)

add_images(tag, images, data_format='NCHW', global_step=None)

Add multiple images to SGLogger. Typically, this function will add a set of images to tensorboard, save them to disk or add it to experiment management framework.

Parameters:

Name Type Description Default
tag str

Data identifier

required
images Union[torch.Tensor, np.array]

images to be added. The values should lie in [0, 255] for type uint8 or [0, 1] for type float.

required
data_format

Image data format specification of the form NCHW, NHWC, CHW, HWC, HW, WH, etc.

'NCHW'
global_step int

Global step value to record

None
Source code in src/super_gradients/common/sg_loggers/base_sg_logger.py
203
204
205
206
207
208
209
210
211
212
213
214
@multi_process_safe
def add_images(self, tag: str, images: Union[torch.Tensor, np.array], data_format="NCHW", global_step: int = None):
    """
    Add multiple images to SGLogger.
    Typically, this function will add a set of images to tensorboard, save them to disk or add it to experiment management framework.

    :param tag: Data identifier
    :param images: images to be added. The values should lie in [0, 255] for type uint8 or [0, 1] for type float.
    :param data_format: Image data format specification of the form NCHW, NHWC, CHW, HWC, HW, WH, etc.
    :param global_step: Global step value to record
    """
    self.tensorboard_writer.add_images(tag=tag, img_tensor=images, dataformats=data_format, global_step=global_step)

add_model_graph(tag, model, dummy_input)

Add a pytorch model graph to the SGLogger. Only the model structure/architecture will be preserved and collected, NOT the model weights.

Parameters:

Name Type Description Default
tag str

Data identifier

required
model torch.nn.Module

the model to be added

required
dummy_input torch.Tensor

an input to be used for a forward call on the model

required
Source code in src/super_gradients/common/sg_loggers/base_sg_logger.py
236
237
238
239
240
241
242
243
244
245
246
@multi_process_safe
def add_model_graph(self, tag: str, model: torch.nn.Module, dummy_input: torch.Tensor):
    """
    Add a pytorch model graph to the SGLogger.
    Only the model structure/architecture will be preserved and collected, NOT the model weights.

    :param tag: Data identifier
    :param model: the model to be added
    :param dummy_input: an input to be used for a forward call on the model
    """
    self.tensorboard_writer.add_graph(model=model, input_to_model=dummy_input)

add_scalars(tag_scalar_dict, global_step=None)

add multiple scalars. Unlike Tensorboard implementation, this does not add all scalars with a main tag (all scalars to the same chart). Instead, scalars are added to tensorboard like in add_scalar and are written in log together.

Source code in src/super_gradients/common/sg_loggers/base_sg_logger.py
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@multi_process_safe
def add_scalars(self, tag_scalar_dict: dict, global_step: int = None):
    """
    add multiple scalars.
    Unlike Tensorboard implementation, this does not add all scalars with a main tag (all scalars to the same chart).
    Instead, scalars are added to tensorboard like in add_scalar and are written in log together.
    """
    for tag, value in tag_scalar_dict.items():
        self.tensorboard_writer.add_scalar(tag=tag.lower().replace(" ", "_"), scalar_value=value, global_step=global_step)

    self.tensorboard_writer.flush()

    # WRITE THE EPOCH RESULTS TO LOG FILE
    log_line = f"\nEpoch {global_step} ({global_step+1}/{self.max_global_steps})  - "
    for tag, value in tag_scalar_dict.items():
        if isinstance(value, torch.Tensor):
            value = value.item()
        log_line += f'{tag.replace(" ", "_")}: {value}\t'

    self._write_to_log_file([log_line])

add_video(tag, video, global_step=None)

Add a single video to SGLogger. Typically, this function will add a video to tensorboard, save it to disk or add it to experiment management framework.

Parameters:

Name Type Description Default
tag str

Data identifier

required
video Union[torch.Tensor, np.array]

the video to add. shape (N,T,C,H,W) or (T,C,H,W). The values should lie in [0, 255] for type uint8 or [0, 1] for type float.

required
global_step int

Global step value to record

None
Source code in src/super_gradients/common/sg_loggers/base_sg_logger.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
@multi_process_safe
def add_video(self, tag: str, video: Union[torch.Tensor, np.array], global_step: int = None):
    """
    Add a single video to SGLogger.
    Typically, this function will add a video to tensorboard, save it to disk or add it to experiment management framework.

    :param tag: Data identifier
    :param video: the video to add. shape (N,T,C,H,W) or (T,C,H,W). The values should lie in [0, 255] for type uint8 or [0, 1] for type float.
    :param global_step: Global step value to record
    """
    if video.ndim < 5:
        video = video[
            None,
        ]
    self.tensorboard_writer.add_video(tag=tag, video=video, global_step=global_step)

upload()

Upload the local tensorboard and log files to remote system.

Source code in src/super_gradients/common/sg_loggers/base_sg_logger.py
269
270
271
272
273
274
275
276
277
278
279
@multi_process_safe
def upload(self):
    """Upload the local tensorboard and log files to remote system."""
    self.flush()

    if self.save_tensorboard_remote:
        self.model_checkpoints_data_interface.save_remote_tensorboard_event_files(self.experiment_name, self._local_dir)

    if self.save_logs_remote:
        log_file_name = self.experiment_log_path.split("/")[-1]
        self.model_checkpoints_data_interface.save_remote_checkpoints_file(self.experiment_name, self._local_dir, log_file_name)

ClearMLSGLogger

Bases: BaseSGLogger

Source code in src/super_gradients/common/sg_loggers/clearml_sg_logger.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
@register_sg_logger("clearml_sg_logger")
class ClearMLSGLogger(BaseSGLogger):
    def __init__(
        self,
        project_name: str,
        experiment_name: str,
        storage_location: str,
        resumed: bool,
        training_params: dict,
        checkpoints_dir_path: str,
        tb_files_user_prompt: bool = False,
        launch_tensorboard: bool = False,
        tensorboard_port: int = None,
        save_checkpoints_remote: bool = True,
        save_tensorboard_remote: bool = True,
        save_logs_remote: bool = True,
        monitor_system: bool = None,
    ):
        """
        :param project_name:            ClearML project name that can include many experiments
        :param experiment_name:         Name used for logging and loading purposes
        :param storage_location:        If set to 's3' (i.e. s3://my-bucket) saves the Checkpoints in AWS S3 otherwise saves the Checkpoints Locally
        :param resumed:                 If true, then old tensorboard files will **NOT** be deleted when tb_files_user_prompt=True
        :param training_params:         training_params for the experiment.
        :param checkpoints_dir_path:    Local root directory path where all experiment logging directories will reside.
        :param tb_files_user_prompt:    Asks user for Tensorboard deletion prompt.
        :param launch_tensorboard:      Whether to launch a TensorBoard process.
        :param tensorboard_port:        Specific port number for the tensorboard to use when launched (when set to None, some free port number will be used
        :param save_checkpoints_remote: Saves checkpoints in s3.
        :param save_tensorboard_remote: Saves tensorboard in s3.
        :param save_logs_remote:        Saves log files in s3.
        :param monitor_system:          Not Available for ClearML logger. Save the system statistics (GPU utilization, CPU, ...) in the tensorboard
        """
        if monitor_system is not None:
            logger.warning("monitor_system not available on ClearMLSGLogger. To remove this warning, please don't set monitor_system in your logger parameters")

        self.s3_location_available = storage_location.startswith("s3")
        super().__init__(
            project_name=project_name,
            experiment_name=experiment_name,
            storage_location=storage_location,
            resumed=resumed,
            training_params=training_params,
            checkpoints_dir_path=checkpoints_dir_path,
            tb_files_user_prompt=tb_files_user_prompt,
            launch_tensorboard=launch_tensorboard,
            tensorboard_port=tensorboard_port,
            save_checkpoints_remote=self.s3_location_available,
            save_tensorboard_remote=self.s3_location_available,
            save_logs_remote=self.s3_location_available,
            monitor_system=False,
        )

        if _imported_clear_ml_failure is not None:
            raise _imported_clear_ml_failure

        self.setup(project_name, experiment_name)

        self.save_checkpoints = save_checkpoints_remote
        self.save_tensorboard = save_tensorboard_remote
        self.save_logs = save_logs_remote

    @multi_process_safe
    def setup(self, project_name, experiment_name):
        from multiprocessing.process import BaseProcess

        # Prevent clearml modifying os.fork and BaseProcess.run, which can cause a DataLoader to crash (if num_worker > 0)
        # Issue opened here: https://github.com/allegroai/clearml/issues/790
        default_fork, default_run = os.fork, BaseProcess.run
        self.task = Task.init(
            project_name=project_name,  # project name of at least 3 characters
            task_name=experiment_name,  # task name of at least 3 characters
            continue_last_task=0,  # This prevents clear_ml to add an offset to the epoch
            auto_connect_arg_parser=False,
            auto_connect_frameworks=False,
            auto_resource_monitoring=False,
            auto_connect_streams=True,
        )
        os.fork, BaseProcess.run = default_fork, default_run
        self.clearml_logger = self.task.get_logger()

    @multi_process_safe
    def add_config(self, tag: str, config: dict):
        super(ClearMLSGLogger, self).add_config(tag=tag, config=config)
        self.task.connect(config)

    def __add_scalar(self, tag: str, scalar_value: float, global_step: int):
        self.clearml_logger.report_scalar(title=tag, series=tag, value=scalar_value, iteration=global_step)

    @multi_process_safe
    def add_scalar(self, tag: str, scalar_value: float, global_step: Union[int, TimeUnit] = 0):
        super(ClearMLSGLogger, self).add_scalar(tag=tag, scalar_value=scalar_value, global_step=global_step)
        if isinstance(global_step, TimeUnit):
            global_step = global_step.get_value()
        self.__add_scalar(tag=tag, scalar_value=scalar_value, global_step=global_step)

    @multi_process_safe
    def add_scalars(self, tag_scalar_dict: dict, global_step: int = 0):
        super(ClearMLSGLogger, self).add_scalars(tag_scalar_dict=tag_scalar_dict, global_step=global_step)
        for tag, scalar_value in tag_scalar_dict.items():
            self.__add_scalar(tag=tag, scalar_value=scalar_value, global_step=global_step)

    def __add_image(
        self,
        tag: str,
        image: Union[torch.Tensor, np.array, Image.Image],
        global_step: int,
    ):
        if isinstance(image, torch.Tensor):
            image = image.cpu().detach().numpy()
        if image.shape[0] < 5:
            image = image.transpose([1, 2, 0])
        self.clearml_logger.report_image(
            title=tag,
            series=tag,
            image=image,
            iteration=global_step,
            max_image_history=-1,
        )

    @multi_process_safe
    def add_image(
        self,
        tag: str,
        image: Union[torch.Tensor, np.array, Image.Image],
        data_format="CHW",
        global_step: int = 0,
    ):
        super(ClearMLSGLogger, self).add_image(tag=tag, image=image, data_format=data_format, global_step=global_step)
        self.__add_image(tag, image, global_step)

    @multi_process_safe
    def add_images(
        self,
        tag: str,
        images: Union[torch.Tensor, np.array],
        data_format="NCHW",
        global_step: int = 0,
    ):
        super(ClearMLSGLogger, self).add_images(tag=tag, images=images, data_format=data_format, global_step=global_step)
        for image in images:
            self.__add_image(tag, image, global_step)

    @multi_process_safe
    def add_video(self, tag: str, video: Union[torch.Tensor, np.array], global_step: int = 0):
        super().add_video(tag, video, global_step)
        logger.warning("ClearMLSGLogger does not support uploading video to clearML from a tensor/array.")

    @multi_process_safe
    def add_histogram(
        self,
        tag: str,
        values: Union[torch.Tensor, np.array],
        bins: str,
        global_step: int = 0,
    ):
        super().add_histogram(tag, values, bins, global_step)
        self.clearml_logger.report_histogram(title=tag, series=tag, iteration=global_step, values=values)

    @multi_process_safe
    def add_text(self, tag: str, text_string: str, global_step: int = 0):
        super().add_text(tag, text_string, global_step)
        self.clearml_logger.report_text(text_string)

    @multi_process_safe
    def add_figure(self, tag: str, figure: plt.figure, global_step: int = 0):
        super().add_figure(tag, figure, global_step)
        name = f"tmp_{tag}.png"
        path = os.path.join(self._local_dir, name)
        figure.savefig(path)
        self.task.upload_artifact(name=name, artifact_object=path)
        os.remove(path)

    @multi_process_safe
    def close(self):
        super().close()
        self.task.close()

    @multi_process_safe
    def add_file(self, file_name: str = None):
        super().add_file(file_name)
        self.task.upload_artifact(name=file_name, artifact_object=os.path.join(self._local_dir, file_name))

    @multi_process_safe
    def upload(self):
        super().upload()

        if self.save_tensorboard:
            name = self._get_tensorboard_file_name().split("/")[-1]
            self.task.upload_artifact(name=name, artifact_object=self._get_tensorboard_file_name())

        if self.save_logs:
            name = self.experiment_log_path.split("/")[-1]
            self.task.upload_artifact(name=name, artifact_object=self.experiment_log_path)

    @multi_process_safe
    def add_checkpoint(self, tag: str, state_dict: dict, global_step: int = 0):
        state_dict = self._sanitize_checkpoint(state_dict)

        name = f"ckpt_{global_step}.pth" if tag is None else tag
        if not name.endswith(".pth"):
            name += ".pth"

        path = os.path.join(self._local_dir, name)
        torch.save(state_dict, path)

        if self.save_checkpoints:
            if self.s3_location_available:
                self.model_checkpoints_data_interface.save_remote_checkpoints_file(self.experiment_name, self._local_dir, name)
            self.task.upload_artifact(name=name, artifact_object=path)

    def _get_tensorboard_file_name(self):
        try:
            tb_file_path = self.tensorboard_writer.file_writer.event_writer._file_name
        except RuntimeError:
            logger.warning("tensorboard file could not be located for ")
            return None

        return tb_file_path

    def add(self, tag: str, obj: Any, global_step: int = None):
        pass

__init__(project_name, experiment_name, storage_location, resumed, training_params, checkpoints_dir_path, tb_files_user_prompt=False, launch_tensorboard=False, tensorboard_port=None, save_checkpoints_remote=True, save_tensorboard_remote=True, save_logs_remote=True, monitor_system=None)

Parameters:

Name Type Description Default
project_name str

ClearML project name that can include many experiments

required
experiment_name str

Name used for logging and loading purposes

required
storage_location str

If set to 's3' (i.e. s3://my-bucket) saves the Checkpoints in AWS S3 otherwise saves the Checkpoints Locally

required
resumed bool

If true, then old tensorboard files will NOT be deleted when tb_files_user_prompt=True

required
training_params dict

training_params for the experiment.

required
checkpoints_dir_path str

Local root directory path where all experiment logging directories will reside.

required
tb_files_user_prompt bool

Asks user for Tensorboard deletion prompt.

False
launch_tensorboard bool

Whether to launch a TensorBoard process.

False
tensorboard_port int

Specific port number for the tensorboard to use when launched (when set to None, some free port number will be used

None
save_checkpoints_remote bool

Saves checkpoints in s3.

True
save_tensorboard_remote bool

Saves tensorboard in s3.

True
save_logs_remote bool

Saves log files in s3.

True
monitor_system bool

Not Available for ClearML logger. Save the system statistics (GPU utilization, CPU, ...) in the tensorboard

None
Source code in src/super_gradients/common/sg_loggers/clearml_sg_logger.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def __init__(
    self,
    project_name: str,
    experiment_name: str,
    storage_location: str,
    resumed: bool,
    training_params: dict,
    checkpoints_dir_path: str,
    tb_files_user_prompt: bool = False,
    launch_tensorboard: bool = False,
    tensorboard_port: int = None,
    save_checkpoints_remote: bool = True,
    save_tensorboard_remote: bool = True,
    save_logs_remote: bool = True,
    monitor_system: bool = None,
):
    """
    :param project_name:            ClearML project name that can include many experiments
    :param experiment_name:         Name used for logging and loading purposes
    :param storage_location:        If set to 's3' (i.e. s3://my-bucket) saves the Checkpoints in AWS S3 otherwise saves the Checkpoints Locally
    :param resumed:                 If true, then old tensorboard files will **NOT** be deleted when tb_files_user_prompt=True
    :param training_params:         training_params for the experiment.
    :param checkpoints_dir_path:    Local root directory path where all experiment logging directories will reside.
    :param tb_files_user_prompt:    Asks user for Tensorboard deletion prompt.
    :param launch_tensorboard:      Whether to launch a TensorBoard process.
    :param tensorboard_port:        Specific port number for the tensorboard to use when launched (when set to None, some free port number will be used
    :param save_checkpoints_remote: Saves checkpoints in s3.
    :param save_tensorboard_remote: Saves tensorboard in s3.
    :param save_logs_remote:        Saves log files in s3.
    :param monitor_system:          Not Available for ClearML logger. Save the system statistics (GPU utilization, CPU, ...) in the tensorboard
    """
    if monitor_system is not None:
        logger.warning("monitor_system not available on ClearMLSGLogger. To remove this warning, please don't set monitor_system in your logger parameters")

    self.s3_location_available = storage_location.startswith("s3")
    super().__init__(
        project_name=project_name,
        experiment_name=experiment_name,
        storage_location=storage_location,
        resumed=resumed,
        training_params=training_params,
        checkpoints_dir_path=checkpoints_dir_path,
        tb_files_user_prompt=tb_files_user_prompt,
        launch_tensorboard=launch_tensorboard,
        tensorboard_port=tensorboard_port,
        save_checkpoints_remote=self.s3_location_available,
        save_tensorboard_remote=self.s3_location_available,
        save_logs_remote=self.s3_location_available,
        monitor_system=False,
    )

    if _imported_clear_ml_failure is not None:
        raise _imported_clear_ml_failure

    self.setup(project_name, experiment_name)

    self.save_checkpoints = save_checkpoints_remote
    self.save_tensorboard = save_tensorboard_remote
    self.save_logs = save_logs_remote

DagsHubSGLogger

Bases: BaseSGLogger

Source code in src/super_gradients/common/sg_loggers/dagshub_sg_logger.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
@register_sg_logger("dagshub_sg_logger")
class DagsHubSGLogger(BaseSGLogger):
    def __init__(
        self,
        project_name: str,
        experiment_name: str,
        storage_location: str,
        resumed: bool,
        training_params: dict,
        checkpoints_dir_path: str,
        tb_files_user_prompt: bool = False,
        launch_tensorboard: bool = False,
        tensorboard_port: int = None,
        save_checkpoints_remote: bool = True,
        save_tensorboard_remote: bool = True,
        save_logs_remote: bool = True,
        monitor_system: bool = None,
        dagshub_repository: Optional[str] = None,
        log_mlflow_only: bool = False,
    ):
        """

        :param experiment_name:         Name used for logging and loading purposes
        :param storage_location:        If set to 's3' (i.e. s3://my-bucket) saves the Checkpoints in AWS S3 otherwise saves the Checkpoints Locally
        :param resumed:                 If true, then old tensorboard files will **NOT** be deleted when tb_files_user_prompt=True
        :param training_params:         training_params for the experiment.
        :param checkpoints_dir_path:    Local root directory path where all experiment logging directories will reside.
        :param tb_files_user_prompt:    Asks user for Tensorboard deletion prompt.
        :param launch_tensorboard:      Whether to launch a TensorBoard process.
        :param tensorboard_port:        Specific port number for the tensorboard to use when launched (when set to None,
                                        some free port number will be used
        :param save_checkpoints_remote: Saves checkpoints in s3 and DagsHub.
        :param save_tensorboard_remote: Saves tensorboard in s3.
        :param save_logs_remote:        Saves log files in s3 and DagsHub.
        :param monitor_system:          Save the system statistics (GPU utilization, CPU, ...) in the tensorboard
        :param dagshub_repository:      Format: <dagshub_username>/<dagshub_reponame> format is set correctly to avoid
                                        any potential issues. If you are utilizing the dagshub_sg_logger, please specify
                                        the dagshub_repository in sg_logger_params to prevent any interruptions from
                                        prompts during automated pipelines. In the event that the repository does not
                                        exist, it will be created automatically on your behalf.
        :param log_mlflow_only:         Skip logging to DVC, use MLflow for all artifacts being logged
        """
        if monitor_system is not None:
            logger.warning("monitor_system not available on DagsHubSGLogger. To remove this warning, please don't set monitor_system in your logger parameters")

        self.s3_location_available = storage_location.startswith("s3")
        super().__init__(
            project_name=project_name,
            experiment_name=experiment_name,
            storage_location=storage_location,
            resumed=resumed,
            training_params=training_params,
            checkpoints_dir_path=checkpoints_dir_path,
            tb_files_user_prompt=tb_files_user_prompt,
            launch_tensorboard=launch_tensorboard,
            tensorboard_port=tensorboard_port,
            save_checkpoints_remote=self.s3_location_available,
            save_tensorboard_remote=self.s3_location_available,
            save_logs_remote=self.s3_location_available,
            monitor_system=False,
        )
        if _import_dagshub_error:
            raise _import_dagshub_error

        if _import_mlflow_error:
            raise _import_mlflow_error

        self.repo_name, self.repo_owner, self.remote = None, None, None
        if dagshub_repository:
            self.repo_name, self.repo_owner = self.splitter(dagshub_repository)

        dagshub_auth = os.getenv("DAGSHUB_USER_TOKEN")
        if dagshub_auth:
            dagshub.auth.add_app_token(dagshub_auth)

        self._init_env_dependency()

        self.log_mlflow_only = log_mlflow_only
        self.save_checkpoints_dagshub = save_checkpoints_remote
        self.save_logs_dagshub = save_logs_remote

    @staticmethod
    def splitter(repo):
        splitted = repo.split("/")
        if len(splitted) != 2:
            raise Exception(f"Invalid input, should be owner_name/repo_name, but got {repo} instead")
        return splitted[1], splitted[0]

    def _init_env_dependency(self):
        """
        The function creates paths for the DVC directory, models, and artifacts, obtains an authentication token from
        Dagshub, and sets MLflow tracking credentials. It also checks whether the repository name and owner have been
        set and prompts the user to enter them if they haven't. If the remote URI is not set or does not include
        "dagshub", Dagshub is initialized with the repository name and owner, and the remote URI is obtained. The method
        then creates a Repo object with the repository information and sets the DVC folder to the DVC directory path.
        """

        self.paths = {
            "dvc_directory": Path("artifacts"),
            "models": Path("models"),
            "artifacts": Path("artifacts"),
        }

        token = dagshub.auth.get_token()
        os.environ["MLFLOW_TRACKING_USERNAME"] = token
        os.environ["MLFLOW_TRACKING_PASSWORD"] = token

        # Check mlflow environment variable is set:
        if not self.repo_name or not self.repo_owner:
            self.repo_name, self.repo_owner = self.splitter(input("Please insert your repository owner_name/repo_name:"))

        if not self.remote or "dagshub" not in os.getenv("MLFLOW_TRACKING_URI"):
            dagshub.init(repo_name=self.repo_name, repo_owner=self.repo_owner)
            self.remote = os.getenv("MLFLOW_TRACKING_URI")

        self.repo = Repo(
            owner=self.remote.split(os.sep)[-2],
            name=self.remote.split(os.sep)[-1].replace(".mlflow", ""),
            branch=os.getenv("BRANCH", "main"),
        )
        self.dvc_folder = self.repo.directory(str(self.paths["dvc_directory"]))

        mlflow.set_tracking_uri(self.remote)
        mlflow.set_experiment(self.experiment_name)
        self.run = mlflow.start_run(nested=True)
        return self.run

    @multi_process_safe
    def _dvc_add(self, local_path="", remote_path=""):
        if not os.path.isfile(local_path):
            FileExistsError(f"Invalid file path: {local_path}")
        self.dvc_folder.add(file=local_path, path=remote_path)

    @multi_process_safe
    def _dvc_commit(self, commit=""):
        self.dvc_folder.commit(commit, versioning="dvc", force=True)

    @multi_process_safe
    def _get_nested_dict_values(self, d, parent_key="", sep="/"):
        items = []
        for k, v in d.items():
            new_key = parent_key + sep + k if parent_key else k
            if isinstance(v, Mapping):
                items.extend(self._get_nested_dict_values(v, new_key, sep=sep))
            else:
                items.append((new_key, v))
        return items

    @multi_process_safe
    def _sanitize_special_characters(self, text):
        pattern = r"[!\"#$%&'()*+,:;<=>?@[\]^`{|}~\t\n\r\x0b\x0c]"
        valid_text = re.sub(pattern, "_", text)
        return valid_text

    @multi_process_safe
    def add_config(self, tag: str, config: dict):
        super(DagsHubSGLogger, self).add_config(tag=tag, config=config)
        flatten_dict = self._get_nested_dict_values(d=config)
        for k, v in flatten_dict:
            try:
                k_sanitized = self._sanitize_special_characters(k)
                mlflow.log_params({k_sanitized: v})
            except Exception as e:
                err_msg = f"Fail to log the config: {k}, got an expection: {e}"
                logger.warning(err_msg)

    @multi_process_safe
    def add_scalar(self, tag: str, scalar_value: float, global_step: [int, TimeUnit] = 0):
        super(DagsHubSGLogger, self).add_scalar(tag=tag, scalar_value=scalar_value, global_step=global_step)
        try:
            if isinstance(global_step, TimeUnit):
                global_step = global_step.get_value()

            tag_sanitized = self._sanitize_special_characters(tag)
            mlflow.log_metric(key=tag_sanitized, value=scalar_value, step=global_step)
        except Exception as e:
            err_msg = f"Fail to log the metric: {tag}, got an expection: {e}"
            raise Exception(err_msg)

    @multi_process_safe
    def add_scalars(self, tag_scalar_dict: dict, global_step: int = 0):
        super(DagsHubSGLogger, self).add_scalars(tag_scalar_dict=tag_scalar_dict, global_step=global_step)
        try:
            mlflow.log_metrics(metrics=tag_scalar_dict, step=global_step)
        except Exception:
            flatten_dicts = self._get_nested_dict_values(tag_scalar_dict)
            for k, v in flatten_dicts:
                try:
                    if isinstance(v, torch.Tensor):
                        v = v.item()
                    else:
                        v = float(v)
                    self.add_scalar(tag=k.replace("@", "at"), scalar_value=v, global_step=global_step)
                except Exception as e:
                    logger.warning(e)

    @multi_process_safe
    def close(self):
        super().close()
        try:
            if not self.log_mlflow_only:
                self._dvc_commit(commit=f"Adding all artifacts from run {mlflow.active_run().info.run_id}")
            mlflow.end_run()
        except Exception:
            pass

    @multi_process_safe
    def add_file(self, file_name: str = None):
        super().add_file(file_name)
        if self.log_mlflow_only:
            mlflow.log_artifact(file_name)
        else:
            self._dvc_add(local_path=file_name, remote_path=os.path.join(self.paths["artifacts"], self.experiment_log_path))

    @multi_process_safe
    def upload(self):
        super().upload()

        if self.save_logs_dagshub:
            if self.log_mlflow_only:
                mlflow.log_artifact(self.experiment_log_path)
            else:
                self._dvc_add(local_path=self.experiment_log_path, remote_path=os.path.join(self.paths["artifacts"], self.experiment_log_path))

    @multi_process_safe
    def add_checkpoint(self, tag: str, state_dict: dict, global_step: int = 0):
        state_dict = self._sanitize_checkpoint(state_dict)
        name = f"ckpt_{global_step}.pth" if tag is None else tag
        if not name.endswith(".pth"):
            name += ".pth"
        path = os.path.join(self._local_dir, name)
        torch.save(state_dict, path)
        if self.save_checkpoints_dagshub:
            mlflow.log_artifact(path)
            if (global_step >= (self.max_global_steps - 1)) and not self.log_mlflow_only:
                self._dvc_add(local_path=path, remote_path=os.path.join(self.paths["models"], name))

__init__(project_name, experiment_name, storage_location, resumed, training_params, checkpoints_dir_path, tb_files_user_prompt=False, launch_tensorboard=False, tensorboard_port=None, save_checkpoints_remote=True, save_tensorboard_remote=True, save_logs_remote=True, monitor_system=None, dagshub_repository=None, log_mlflow_only=False)

Parameters:

Name Type Description Default
experiment_name str

Name used for logging and loading purposes

required
storage_location str

If set to 's3' (i.e. s3://my-bucket) saves the Checkpoints in AWS S3 otherwise saves the Checkpoints Locally

required
resumed bool

If true, then old tensorboard files will NOT be deleted when tb_files_user_prompt=True

required
training_params dict

training_params for the experiment.

required
checkpoints_dir_path str

Local root directory path where all experiment logging directories will reside.

required
tb_files_user_prompt bool

Asks user for Tensorboard deletion prompt.

False
launch_tensorboard bool

Whether to launch a TensorBoard process.

False
tensorboard_port int

Specific port number for the tensorboard to use when launched (when set to None, some free port number will be used

None
save_checkpoints_remote bool

Saves checkpoints in s3 and DagsHub.

True
save_tensorboard_remote bool

Saves tensorboard in s3.

True
save_logs_remote bool

Saves log files in s3 and DagsHub.

True
monitor_system bool

Save the system statistics (GPU utilization, CPU, ...) in the tensorboard

None
dagshub_repository Optional[str]

Format: / format is set correctly to avoid any potential issues. If you are utilizing the dagshub_sg_logger, please specify the dagshub_repository in sg_logger_params to prevent any interruptions from prompts during automated pipelines. In the event that the repository does not exist, it will be created automatically on your behalf.

None
log_mlflow_only bool

Skip logging to DVC, use MLflow for all artifacts being logged

False
Source code in src/super_gradients/common/sg_loggers/dagshub_sg_logger.py
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
def __init__(
    self,
    project_name: str,
    experiment_name: str,
    storage_location: str,
    resumed: bool,
    training_params: dict,
    checkpoints_dir_path: str,
    tb_files_user_prompt: bool = False,
    launch_tensorboard: bool = False,
    tensorboard_port: int = None,
    save_checkpoints_remote: bool = True,
    save_tensorboard_remote: bool = True,
    save_logs_remote: bool = True,
    monitor_system: bool = None,
    dagshub_repository: Optional[str] = None,
    log_mlflow_only: bool = False,
):
    """

    :param experiment_name:         Name used for logging and loading purposes
    :param storage_location:        If set to 's3' (i.e. s3://my-bucket) saves the Checkpoints in AWS S3 otherwise saves the Checkpoints Locally
    :param resumed:                 If true, then old tensorboard files will **NOT** be deleted when tb_files_user_prompt=True
    :param training_params:         training_params for the experiment.
    :param checkpoints_dir_path:    Local root directory path where all experiment logging directories will reside.
    :param tb_files_user_prompt:    Asks user for Tensorboard deletion prompt.
    :param launch_tensorboard:      Whether to launch a TensorBoard process.
    :param tensorboard_port:        Specific port number for the tensorboard to use when launched (when set to None,
                                    some free port number will be used
    :param save_checkpoints_remote: Saves checkpoints in s3 and DagsHub.
    :param save_tensorboard_remote: Saves tensorboard in s3.
    :param save_logs_remote:        Saves log files in s3 and DagsHub.
    :param monitor_system:          Save the system statistics (GPU utilization, CPU, ...) in the tensorboard
    :param dagshub_repository:      Format: <dagshub_username>/<dagshub_reponame> format is set correctly to avoid
                                    any potential issues. If you are utilizing the dagshub_sg_logger, please specify
                                    the dagshub_repository in sg_logger_params to prevent any interruptions from
                                    prompts during automated pipelines. In the event that the repository does not
                                    exist, it will be created automatically on your behalf.
    :param log_mlflow_only:         Skip logging to DVC, use MLflow for all artifacts being logged
    """
    if monitor_system is not None:
        logger.warning("monitor_system not available on DagsHubSGLogger. To remove this warning, please don't set monitor_system in your logger parameters")

    self.s3_location_available = storage_location.startswith("s3")
    super().__init__(
        project_name=project_name,
        experiment_name=experiment_name,
        storage_location=storage_location,
        resumed=resumed,
        training_params=training_params,
        checkpoints_dir_path=checkpoints_dir_path,
        tb_files_user_prompt=tb_files_user_prompt,
        launch_tensorboard=launch_tensorboard,
        tensorboard_port=tensorboard_port,
        save_checkpoints_remote=self.s3_location_available,
        save_tensorboard_remote=self.s3_location_available,
        save_logs_remote=self.s3_location_available,
        monitor_system=False,
    )
    if _import_dagshub_error:
        raise _import_dagshub_error

    if _import_mlflow_error:
        raise _import_mlflow_error

    self.repo_name, self.repo_owner, self.remote = None, None, None
    if dagshub_repository:
        self.repo_name, self.repo_owner = self.splitter(dagshub_repository)

    dagshub_auth = os.getenv("DAGSHUB_USER_TOKEN")
    if dagshub_auth:
        dagshub.auth.add_app_token(dagshub_auth)

    self._init_env_dependency()

    self.log_mlflow_only = log_mlflow_only
    self.save_checkpoints_dagshub = save_checkpoints_remote
    self.save_logs_dagshub = save_logs_remote

DeciPlatformSGLogger

Bases: BaseSGLogger

Logger responsible to push logs and tensorboard artifacts to Deci platform.

Source code in src/super_gradients/common/sg_loggers/deci_platform_sg_logger.py
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
@register_sg_logger("deci_platform_sg_logger")
class DeciPlatformSGLogger(BaseSGLogger):
    """Logger responsible to push logs and tensorboard artifacts to Deci platform."""

    def __init__(
        self,
        project_name: str,
        experiment_name: str,
        storage_location: str,
        resumed: bool,
        training_params: dict,
        checkpoints_dir_path: str,
        model_name: str,
        upload_model: bool = True,
        tb_files_user_prompt: bool = False,
        launch_tensorboard: bool = False,
        tensorboard_port: int = None,
        save_checkpoints_remote: bool = True,
        save_tensorboard_remote: bool = True,
        save_logs_remote: bool = True,
        monitor_system: bool = True,
    ):
        """

        :param experiment_name:         Name used for logging and loading purposes
        :param storage_location:        If set to 's3' (i.e. s3://my-bucket) saves the Checkpoints in AWS S3 otherwise saves the Checkpoints Locally
        :param resumed:                 If true, then old tensorboard files will **NOT** be deleted when tb_files_user_prompt=True
        :param training_params:         training_params for the experiment.
        :param checkpoints_dir_path:    Local root directory path where all experiment logging directories will reside.
        :param model_name:              Name of the model to be used for logging.
        :param upload_model:            Whether to upload the model to the Deci Platform or not.
        :param tb_files_user_prompt:    Asks user for Tensorboard deletion prompt.
        :param launch_tensorboard:      Whether to launch a TensorBoard process.
        :param tensorboard_port:        Specific port number for the tensorboard to use when launched (when set to None, some free port number will be used
        :param save_checkpoints_remote: Saves checkpoints in s3.
        :param save_tensorboard_remote: Saves tensorboard in s3.
        :param save_logs_remote:        Saves log files in s3.
        :param monitor_system:          Save the system statistics (GPU utilization, CPU, ...) in the tensorboard
        """
        super().__init__(
            project_name=project_name,
            experiment_name=experiment_name,
            storage_location=storage_location,
            resumed=resumed,
            training_params=training_params,
            checkpoints_dir_path=checkpoints_dir_path,
            tb_files_user_prompt=tb_files_user_prompt,
            launch_tensorboard=launch_tensorboard,
            tensorboard_port=tensorboard_port,
            save_checkpoints_remote=save_checkpoints_remote,
            save_tensorboard_remote=save_tensorboard_remote,
            save_logs_remote=save_logs_remote,
            monitor_system=monitor_system,
        )
        self.platform_client = DeciClient()
        self.platform_client.register_experiment(name=experiment_name, model_name=model_name if model_name else None, resume=resumed)
        self.checkpoints_dir_path = checkpoints_dir_path
        self.upload_model = upload_model

    @multi_process_safe
    def upload(self):
        """
        Upload both to the destination specified by the user (base behavior), and to Deci platform.
        """
        # Upload to the destination specified by the user
        super(DeciPlatformSGLogger, self).upload()

        # Upload to Deci platform
        if not os.path.isdir(self.checkpoints_dir_path):
            raise ValueError("Provided directory does not exist")

        self._upload_latest_file_starting_with(start_with=TENSORBOARD_EVENTS_PREFIX)
        self._upload_latest_file_starting_with(start_with=EXPERIMENT_LOGS_PREFIX)
        self._upload_latest_file_starting_with(start_with=LOGGER_LOGS_PREFIX)
        self._upload_latest_file_starting_with(start_with=CONSOLE_LOGS_PREFIX)
        self._upload_folder_files(folder_name=".hydra")

    @multi_process_safe
    def _save_checkpoint(self, path: str, state_dict: dict) -> None:
        """Save the Checkpoint locally, and then upload it to Deci platform if required.

        :param path:        Full path of the checkpoint
        :param state_dict:  State dict of the checkpoint
        """
        super(DeciPlatformSGLogger, self)._save_checkpoint(path=path, state_dict=state_dict)
        if self.upload_model:
            self._save_experiment_file(file_path=path)

    @multi_process_safe
    def _upload_latest_file_starting_with(self, start_with: str):
        """
        Upload the most recent file starting with a specific prefix to the Deci platform.

        :param start_with: prefix of the file to upload
        """

        files_path = [
            os.path.join(self.checkpoints_dir_path, file_name) for file_name in os.listdir(self.checkpoints_dir_path) if file_name.startswith(start_with)
        ]

        most_recent_file_path = max(files_path, key=os.path.getctime)
        self._save_experiment_file(file_path=most_recent_file_path)

    @multi_process_safe
    def _upload_folder_files(self, folder_name: str):
        """
        Upload all the files of a given folder.

        :param folder_name: Name of the folder that contains the files to upload
        """
        folder_path = os.path.join(self.checkpoints_dir_path, folder_name)

        if not os.path.exists(folder_path):
            return

        for file in os.listdir(folder_path):
            self._save_experiment_file(file_path=f"{folder_path}/{file}")

    def _save_experiment_file(self, file_path: str):
        with log_stdout():  # TODO: remove when platform_client remove prints from save_experiment_file
            self.platform_client.save_experiment_file(file_path=file_path)
        logger.info(f"File saved to Deci platform: {file_path}")

__init__(project_name, experiment_name, storage_location, resumed, training_params, checkpoints_dir_path, model_name, upload_model=True, tb_files_user_prompt=False, launch_tensorboard=False, tensorboard_port=None, save_checkpoints_remote=True, save_tensorboard_remote=True, save_logs_remote=True, monitor_system=True)

Parameters:

Name Type Description Default
experiment_name str

Name used for logging and loading purposes

required
storage_location str

If set to 's3' (i.e. s3://my-bucket) saves the Checkpoints in AWS S3 otherwise saves the Checkpoints Locally

required
resumed bool

If true, then old tensorboard files will NOT be deleted when tb_files_user_prompt=True

required
training_params dict

training_params for the experiment.

required
checkpoints_dir_path str

Local root directory path where all experiment logging directories will reside.

required
model_name str

Name of the model to be used for logging.

required
upload_model bool

Whether to upload the model to the Deci Platform or not.

True
tb_files_user_prompt bool

Asks user for Tensorboard deletion prompt.

False
launch_tensorboard bool

Whether to launch a TensorBoard process.

False
tensorboard_port int

Specific port number for the tensorboard to use when launched (when set to None, some free port number will be used

None
save_checkpoints_remote bool

Saves checkpoints in s3.

True
save_tensorboard_remote bool

Saves tensorboard in s3.

True
save_logs_remote bool

Saves log files in s3.

True
monitor_system bool

Save the system statistics (GPU utilization, CPU, ...) in the tensorboard

True
Source code in src/super_gradients/common/sg_loggers/deci_platform_sg_logger.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
def __init__(
    self,
    project_name: str,
    experiment_name: str,
    storage_location: str,
    resumed: bool,
    training_params: dict,
    checkpoints_dir_path: str,
    model_name: str,
    upload_model: bool = True,
    tb_files_user_prompt: bool = False,
    launch_tensorboard: bool = False,
    tensorboard_port: int = None,
    save_checkpoints_remote: bool = True,
    save_tensorboard_remote: bool = True,
    save_logs_remote: bool = True,
    monitor_system: bool = True,
):
    """

    :param experiment_name:         Name used for logging and loading purposes
    :param storage_location:        If set to 's3' (i.e. s3://my-bucket) saves the Checkpoints in AWS S3 otherwise saves the Checkpoints Locally
    :param resumed:                 If true, then old tensorboard files will **NOT** be deleted when tb_files_user_prompt=True
    :param training_params:         training_params for the experiment.
    :param checkpoints_dir_path:    Local root directory path where all experiment logging directories will reside.
    :param model_name:              Name of the model to be used for logging.
    :param upload_model:            Whether to upload the model to the Deci Platform or not.
    :param tb_files_user_prompt:    Asks user for Tensorboard deletion prompt.
    :param launch_tensorboard:      Whether to launch a TensorBoard process.
    :param tensorboard_port:        Specific port number for the tensorboard to use when launched (when set to None, some free port number will be used
    :param save_checkpoints_remote: Saves checkpoints in s3.
    :param save_tensorboard_remote: Saves tensorboard in s3.
    :param save_logs_remote:        Saves log files in s3.
    :param monitor_system:          Save the system statistics (GPU utilization, CPU, ...) in the tensorboard
    """
    super().__init__(
        project_name=project_name,
        experiment_name=experiment_name,
        storage_location=storage_location,
        resumed=resumed,
        training_params=training_params,
        checkpoints_dir_path=checkpoints_dir_path,
        tb_files_user_prompt=tb_files_user_prompt,
        launch_tensorboard=launch_tensorboard,
        tensorboard_port=tensorboard_port,
        save_checkpoints_remote=save_checkpoints_remote,
        save_tensorboard_remote=save_tensorboard_remote,
        save_logs_remote=save_logs_remote,
        monitor_system=monitor_system,
    )
    self.platform_client = DeciClient()
    self.platform_client.register_experiment(name=experiment_name, model_name=model_name if model_name else None, resume=resumed)
    self.checkpoints_dir_path = checkpoints_dir_path
    self.upload_model = upload_model

upload()

Upload both to the destination specified by the user (base behavior), and to Deci platform.

Source code in src/super_gradients/common/sg_loggers/deci_platform_sg_logger.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
@multi_process_safe
def upload(self):
    """
    Upload both to the destination specified by the user (base behavior), and to Deci platform.
    """
    # Upload to the destination specified by the user
    super(DeciPlatformSGLogger, self).upload()

    # Upload to Deci platform
    if not os.path.isdir(self.checkpoints_dir_path):
        raise ValueError("Provided directory does not exist")

    self._upload_latest_file_starting_with(start_with=TENSORBOARD_EVENTS_PREFIX)
    self._upload_latest_file_starting_with(start_with=EXPERIMENT_LOGS_PREFIX)
    self._upload_latest_file_starting_with(start_with=LOGGER_LOGS_PREFIX)
    self._upload_latest_file_starting_with(start_with=CONSOLE_LOGS_PREFIX)
    self._upload_folder_files(folder_name=".hydra")

log_stdout()

Redirect stdout to DEBUG.

Source code in src/super_gradients/common/sg_loggers/deci_platform_sg_logger.py
142
143
144
145
146
147
148
149
150
151
@contextmanager
def log_stdout():
    """Redirect stdout to DEBUG."""
    buffer = io.StringIO()
    with redirect_stdout(buffer):
        yield

    redirected_str = buffer.getvalue()
    if redirected_str:
        logger.debug(msg=redirected_str)

EpochNumber dataclass

Bases: TimeUnit

A time unit for epoch number.

Source code in src/super_gradients/common/sg_loggers/time_units.py
19
20
21
22
23
24
25
26
27
28
29
30
31
@dataclasses.dataclass
class EpochNumber(TimeUnit):
    """
    A time unit for epoch number.
    """

    value: float

    def get_value(self):
        return self.value

    def get_name(self):
        return "epoch"

GlobalBatchStepNumber dataclass

Bases: TimeUnit

A time unit for representing total number of batches processed, including training and validation ones. Suppose training loader has 320 batches and validation loader has 80 batches. If the current epoch index is 2 (zero-based), and we are on validation loader and current index is 50 (zero-based), then the global batch step is (320 + 80) * 3 + 320 + 50 = 1570.

Source code in src/super_gradients/common/sg_loggers/time_units.py
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
@dataclasses.dataclass
class GlobalBatchStepNumber(TimeUnit):
    """
    A time unit for representing total number of batches processed, including training and validation ones.
    Suppose training loader has 320 batches and validation loader has 80 batches.
    If the current epoch index is 2 (zero-based), and we are on validation loader and current index is 50 (zero-based),
    then the global batch step is (320 + 80) * 3 + 320 + 50 = 1570.
    """

    value: float

    def get_value(self):
        return self.value

    def get_name(self):
        return "global_batch_step"

TimeUnit

Bases: abc.ABC

Abstract class for time units. This is used to explicitly log the time unit of a metric/loss.

Source code in src/super_gradients/common/sg_loggers/time_units.py
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
class TimeUnit(abc.ABC):
    """
    Abstract class for time units. This is used to explicitly log the time unit of a metric/loss.
    """

    @abc.abstractmethod
    def get_value(self):
        ...

    @abc.abstractmethod
    def get_name(self):
        ...

WandBSGLogger

Bases: BaseSGLogger

Source code in src/super_gradients/common/sg_loggers/wandb_sg_logger.py
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
@register_sg_logger("wandb_sg_logger")
class WandBSGLogger(BaseSGLogger):
    def __init__(
        self,
        project_name: str,
        experiment_name: str,
        storage_location: str,
        resumed: bool,
        training_params: dict,
        checkpoints_dir_path: str,
        tb_files_user_prompt: bool = False,
        launch_tensorboard: bool = False,
        tensorboard_port: int = None,
        save_checkpoints_remote: bool = True,
        save_tensorboard_remote: bool = True,
        save_logs_remote: bool = True,
        entity: Optional[str] = None,
        api_server: Optional[str] = None,
        save_code: bool = False,
        monitor_system: bool = None,
        save_checkpoint_as_artifact: bool = False,
        **kwargs,
    ):
        """

        :param experiment_name:         Name used for logging and loading purposes
        :param storage_location:        If set to 's3' (i.e. s3://my-bucket) saves the Checkpoints in AWS S3 otherwise saves the Checkpoints Locally
        :param resumed:                 If true, then old tensorboard files will **NOT** be deleted when tb_files_user_prompt=True
        :param training_params:         training_params for the experiment.
        :param checkpoints_dir_path:    Local root directory path where all experiment logging directories will reside.
        :param tb_files_user_prompt:    Asks user for Tensorboard deletion prompt.
        :param launch_tensorboard:      Whether to launch a TensorBoard process.
        :param tensorboard_port:        Specific port number for the tensorboard to use when launched (when set to None, some free port number will be used)
        :param save_checkpoints_remote: Saves checkpoints in s3.
        :param save_tensorboard_remote: Saves tensorboard in s3.
        :param save_logs_remote:        Saves log files in s3.
        :param monitor_system:          Not Available for WandB logger. Save the system statistics (GPU utilization, CPU, ...) in the tensorboard
        :param save_code:               Save current code to wandb
        :save_checkpoint_as_artifact:   Save model checkpoint using Weights & Biases Artifact. Note that setting this option to True would save model
                                        checkpoints every epoch as a versioned artifact, which will result in use of increased storage usage on
                                        Weights & Biases.
        """
        if monitor_system is not None:
            logger.warning("monitor_system not available on WandBSGLogger. To remove this warning, please don't set monitor_system in your logger parameters")

        self.s3_location_available = storage_location.startswith("s3")
        super().__init__(
            project_name=project_name,
            experiment_name=experiment_name,
            storage_location=storage_location,
            resumed=resumed,
            training_params=training_params,
            checkpoints_dir_path=checkpoints_dir_path,
            tb_files_user_prompt=tb_files_user_prompt,
            launch_tensorboard=launch_tensorboard,
            tensorboard_port=tensorboard_port,
            save_checkpoints_remote=self.s3_location_available,
            save_tensorboard_remote=self.s3_location_available,
            save_logs_remote=self.s3_location_available,
            monitor_system=False,
        )

        if api_server is not None:
            if api_server != env_variables.WANDB_BASE_URL:
                logger.warning(f"WANDB_BASE_URL environment parameter not set to {api_server}. Setting the parameter")
                os.environ["WANDB_BASE_URL"] = api_server

        # allow passing an arbitrary pre-defined wandb_id
        wandb_id = kwargs.pop("wandb_id", None)

        self.resumed = resumed
        if self.resumed:
            if wandb_id is None:
                if self._resume_from_remote_sg_logger:
                    raise RuntimeError(
                        "For WandB loggers, when training_params.resume_from_remote_sg_logger=True "
                        "pass the run id through the wandb_id arg in sg_logger_params"
                    )
                wandb_id = self._get_wandb_id()

        if wandb.run is None:
            run = wandb.init(project=project_name, name=experiment_name, entity=entity, resume=resumed, id=wandb_id, **kwargs)
        else:
            logger.warning(
                "A Weights & Biases run was initialized before initializing `WandBSGLogger`. "
                "This means that `super-gradients` cannot control the run ID to which this session will be logged."
            )
            logger.warning(f"In order to resume this run please call `wandb.init(id={wandb.run.id}, resume='must')` before reinitializing `WandBSGLogger`.")
            run = wandb.run

        if save_code:
            self._save_code_lines()

        self._set_wandb_id(run.id)
        self.save_checkpoints_wandb = save_checkpoints_remote
        self.save_tensorboard_wandb = save_tensorboard_remote
        self.save_logs_wandb = save_logs_remote
        self.save_checkpoint_as_artifact = save_checkpoint_as_artifact

    @multi_process_safe
    def _save_code_lines(self):
        """
        Save the current code to wandb.
        If a file named .wandbinclude is avilable in the root dir of the project the settings will be taken from the file.
        Otherwise, all python file in the current working dir (recursively) will be saved.
        File structure: a single relative path or a single type in each line.
        i.e:

        src
        tests
        examples
        *.py
        *.yaml

        The paths and types in the file are the paths and types to be included in code upload to wandb
        """
        base_path, paths, types = self._get_include_paths()

        if len(types) > 0:

            def func(path):
                for p in paths:
                    if path.startswith(p):
                        for t in types:
                            if path.endswith(t):
                                return True
                return False

            include_fn = func
        else:
            include_fn = lambda path: path.endswith(".py")

        if base_path != ".":
            wandb.run.log_code(base_path, include_fn=include_fn)
        else:
            wandb.run.log_code(".", include_fn=include_fn)

    @multi_process_safe
    def add_config(self, tag: str, config: dict):
        super(WandBSGLogger, self).add_config(tag=tag, config=config)
        wandb.config.update(config, allow_val_change=self.resumed)

    @multi_process_safe
    def add_scalar(self, tag: str, scalar_value: float, global_step: Union[int, TimeUnit] = 0):
        super(WandBSGLogger, self).add_scalar(tag=tag, scalar_value=scalar_value, global_step=global_step)
        if isinstance(global_step, TimeUnit):
            wandb.log(data={tag: scalar_value, global_step.get_name(): global_step.get_value()})
        else:
            wandb.log(data={tag: scalar_value}, step=global_step)

    @multi_process_safe
    def add_scalars(self, tag_scalar_dict: dict, global_step: int = 0):
        super(WandBSGLogger, self).add_scalars(tag_scalar_dict=tag_scalar_dict, global_step=global_step)
        wandb.log(data=tag_scalar_dict, step=global_step)

    @multi_process_safe
    def add_image(self, tag: str, image: Union[torch.Tensor, np.array, Image.Image], data_format="CHW", global_step: int = 0):
        super(WandBSGLogger, self).add_image(tag=tag, image=image, data_format=data_format, global_step=global_step)
        if isinstance(image, torch.Tensor):
            image = image.cpu().detach().numpy()
        if image.shape[0] < 5:
            image = image.transpose([1, 2, 0])
        wandb.log(data={tag: wandb.Image(image, caption=tag)}, step=global_step)

    @multi_process_safe
    def add_images(self, tag: str, images: Union[torch.Tensor, np.array], data_format="NCHW", global_step: int = 0):
        super(WandBSGLogger, self).add_images(tag=tag, images=images, data_format=data_format, global_step=global_step)

        wandb_images = []
        for im in images:
            if isinstance(im, torch.Tensor):
                im = im.cpu().detach().numpy()

            if im.shape[0] < 5:
                im = im.transpose([1, 2, 0])
            wandb_images.append(wandb.Image(im))
        wandb.log({tag: wandb_images}, step=global_step)

    @multi_process_safe
    def add_video(self, tag: str, video: Union[torch.Tensor, np.array], global_step: int = 0):
        super().add_video(tag, video, global_step)

        if video.ndim > 4:
            for index, vid in enumerate(video):
                self.add_video(tag=f"{tag}_{index}", video=vid, global_step=global_step)
        else:
            if isinstance(video, torch.Tensor):
                video = video.cpu().detach().numpy()
            wandb.log({tag: wandb.Video(video, fps=4)}, step=global_step)

    @multi_process_safe
    def add_histogram(self, tag: str, values: Union[torch.Tensor, np.array], bins: str, global_step: int = 0):
        super().add_histogram(tag, values, bins, global_step)
        wandb.log({tag: wandb.Histogram(values, num_bins=bins)}, step=global_step)

    @multi_process_safe
    def add_text(self, tag: str, text_string: str, global_step: int = 0):
        super().add_text(tag, text_string, global_step)
        wandb.log({tag: text_string}, step=global_step)

    @multi_process_safe
    def add_figure(self, tag: str, figure: plt.figure, global_step: int = 0):
        super().add_figure(tag, figure, global_step)
        wandb.log({tag: figure}, step=global_step)

    @multi_process_safe
    def close(self):
        super().close()
        wandb.finish()

    @multi_process_safe
    def add_file(self, file_name: str = None):
        super().add_file(file_name)
        wandb.save(glob_str=os.path.join(self._local_dir, file_name), base_path=self._local_dir, policy="now")

    @multi_process_safe
    def upload(self):
        super().upload()

        if self.save_tensorboard_wandb:
            wandb.save(glob_str=self._get_tensorboard_file_name(), base_path=self._local_dir, policy="now")

        if self.save_logs_wandb:
            wandb.save(glob_str=self.experiment_log_path, base_path=self._local_dir, policy="now")

    def _save_wandb_artifact(self, path):
        """Upload a file or a directory as a Weights & Biases Artifact.
        Note that this function can be called only after wandb.init()

        :param path: the local full path to the pth file to be uploaded
        """
        artifact = wandb.Artifact(f"{wandb.run.id}-checkpoint", type="model")
        if os.path.isdir(path):
            artifact.add_dir(path)
        elif os.path.isfile(path):
            artifact.add_file(path)
        wandb.log_artifact(artifact)

    @multi_process_safe
    def add_checkpoint(self, tag: str, state_dict: dict, global_step: int = 0):
        state_dict = self._sanitize_checkpoint(state_dict)
        name = f"ckpt_{global_step}.pth" if tag is None else tag
        if not name.endswith(".pth"):
            name += ".pth"

        path = os.path.join(self._local_dir, name)
        torch.save(state_dict, path)

        if self.save_checkpoints_wandb:
            if self.s3_location_available:
                self.model_checkpoints_data_interface.save_remote_checkpoints_file(self.experiment_name, self._local_dir, name)
            if self.save_checkpoint_as_artifact:
                self._save_wandb_artifact(path)
            else:
                wandb.save(glob_str=path, base_path=self._local_dir, policy="now")

    def _get_tensorboard_file_name(self):
        try:
            tb_file_path = self.tensorboard_writer.file_writer.event_writer._file_name
        except RuntimeError:
            logger.warning("tensorboard file could not be located for ")
            return None

        return tb_file_path

    def _get_wandb_id(self):
        for file in os.listdir(self._local_dir):
            if file.startswith(WANDB_ID_PREFIX):
                return file.replace(WANDB_ID_PREFIX, "")

    def _set_wandb_id(self, id):
        for file in os.listdir(self._local_dir):
            if file.startswith(WANDB_ID_PREFIX):
                os.remove(os.path.join(self._local_dir, file))

        os.mknod(os.path.join(self._local_dir, f"{WANDB_ID_PREFIX}{id}"))

    def add(self, tag: str, obj: Any, global_step: int = None):
        pass

    def _get_include_paths(self):
        """
        Look for .wandbinclude file in parent dirs and return the list of paths defined in the file.

        file structure is a single relative (i.e. src/) or a single type (i.e *.py)in each line.
        the paths and types in the file are the paths and types to be included in code upload to wandb
        :return: if file exists, return the list of paths and a list of types defined in the file
        """

        wandb_include_file_path = self._search_upwards_for_file(WANDB_INCLUDE_FILE_NAME)
        if wandb_include_file_path is not None:
            with open(wandb_include_file_path) as file:
                lines = file.readlines()

            base_path = os.path.dirname(wandb_include_file_path)
            paths = []
            types = []
            for line in lines:
                line = line.strip().strip("/n")
                if line == "" or line.startswith("#"):
                    continue

                if line.startswith("*."):
                    types.append(line.replace("*", ""))
                else:
                    paths.append(os.path.join(base_path, line))
            return base_path, paths, types

        return ".", [], []

    @staticmethod
    def _search_upwards_for_file(file_name: str):
        """
        Search in the current directory and all directories above it for a file of a particular name.
        :param file_name: file name to look for.
        :return: pathlib.Path, the location of the first file found or None, if none was found
        """

        try:
            cur_dir = os.getcwd()
            while cur_dir != "/":
                if file_name in os.listdir(cur_dir):
                    return os.path.join(cur_dir, file_name)
                else:
                    cur_dir = os.path.dirname(cur_dir)
        except RuntimeError:
            return None

        return None

    def download_remote_ckpt(self, *args, **kwargs):
        wandb.restore("ckpt_latest.pth", replace=True, root=self.local_dir())

__init__(project_name, experiment_name, storage_location, resumed, training_params, checkpoints_dir_path, tb_files_user_prompt=False, launch_tensorboard=False, tensorboard_port=None, save_checkpoints_remote=True, save_tensorboard_remote=True, save_logs_remote=True, entity=None, api_server=None, save_code=False, monitor_system=None, save_checkpoint_as_artifact=False, **kwargs)

:save_checkpoint_as_artifact: Save model checkpoint using Weights & Biases Artifact. Note that setting this option to True would save model checkpoints every epoch as a versioned artifact, which will result in use of increased storage usage on Weights & Biases.

Parameters:

Name Type Description Default
experiment_name str

Name used for logging and loading purposes

required
storage_location str

If set to 's3' (i.e. s3://my-bucket) saves the Checkpoints in AWS S3 otherwise saves the Checkpoints Locally

required
resumed bool

If true, then old tensorboard files will NOT be deleted when tb_files_user_prompt=True

required
training_params dict

training_params for the experiment.

required
checkpoints_dir_path str

Local root directory path where all experiment logging directories will reside.

required
tb_files_user_prompt bool

Asks user for Tensorboard deletion prompt.

False
launch_tensorboard bool

Whether to launch a TensorBoard process.

False
tensorboard_port int

Specific port number for the tensorboard to use when launched (when set to None, some free port number will be used)

None
save_checkpoints_remote bool

Saves checkpoints in s3.

True
save_tensorboard_remote bool

Saves tensorboard in s3.

True
save_logs_remote bool

Saves log files in s3.

True
monitor_system bool

Not Available for WandB logger. Save the system statistics (GPU utilization, CPU, ...) in the tensorboard

None
save_code bool

Save current code to wandb

False
Source code in src/super_gradients/common/sg_loggers/wandb_sg_logger.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def __init__(
    self,
    project_name: str,
    experiment_name: str,
    storage_location: str,
    resumed: bool,
    training_params: dict,
    checkpoints_dir_path: str,
    tb_files_user_prompt: bool = False,
    launch_tensorboard: bool = False,
    tensorboard_port: int = None,
    save_checkpoints_remote: bool = True,
    save_tensorboard_remote: bool = True,
    save_logs_remote: bool = True,
    entity: Optional[str] = None,
    api_server: Optional[str] = None,
    save_code: bool = False,
    monitor_system: bool = None,
    save_checkpoint_as_artifact: bool = False,
    **kwargs,
):
    """

    :param experiment_name:         Name used for logging and loading purposes
    :param storage_location:        If set to 's3' (i.e. s3://my-bucket) saves the Checkpoints in AWS S3 otherwise saves the Checkpoints Locally
    :param resumed:                 If true, then old tensorboard files will **NOT** be deleted when tb_files_user_prompt=True
    :param training_params:         training_params for the experiment.
    :param checkpoints_dir_path:    Local root directory path where all experiment logging directories will reside.
    :param tb_files_user_prompt:    Asks user for Tensorboard deletion prompt.
    :param launch_tensorboard:      Whether to launch a TensorBoard process.
    :param tensorboard_port:        Specific port number for the tensorboard to use when launched (when set to None, some free port number will be used)
    :param save_checkpoints_remote: Saves checkpoints in s3.
    :param save_tensorboard_remote: Saves tensorboard in s3.
    :param save_logs_remote:        Saves log files in s3.
    :param monitor_system:          Not Available for WandB logger. Save the system statistics (GPU utilization, CPU, ...) in the tensorboard
    :param save_code:               Save current code to wandb
    :save_checkpoint_as_artifact:   Save model checkpoint using Weights & Biases Artifact. Note that setting this option to True would save model
                                    checkpoints every epoch as a versioned artifact, which will result in use of increased storage usage on
                                    Weights & Biases.
    """
    if monitor_system is not None:
        logger.warning("monitor_system not available on WandBSGLogger. To remove this warning, please don't set monitor_system in your logger parameters")

    self.s3_location_available = storage_location.startswith("s3")
    super().__init__(
        project_name=project_name,
        experiment_name=experiment_name,
        storage_location=storage_location,
        resumed=resumed,
        training_params=training_params,
        checkpoints_dir_path=checkpoints_dir_path,
        tb_files_user_prompt=tb_files_user_prompt,
        launch_tensorboard=launch_tensorboard,
        tensorboard_port=tensorboard_port,
        save_checkpoints_remote=self.s3_location_available,
        save_tensorboard_remote=self.s3_location_available,
        save_logs_remote=self.s3_location_available,
        monitor_system=False,
    )

    if api_server is not None:
        if api_server != env_variables.WANDB_BASE_URL:
            logger.warning(f"WANDB_BASE_URL environment parameter not set to {api_server}. Setting the parameter")
            os.environ["WANDB_BASE_URL"] = api_server

    # allow passing an arbitrary pre-defined wandb_id
    wandb_id = kwargs.pop("wandb_id", None)

    self.resumed = resumed
    if self.resumed:
        if wandb_id is None:
            if self._resume_from_remote_sg_logger:
                raise RuntimeError(
                    "For WandB loggers, when training_params.resume_from_remote_sg_logger=True "
                    "pass the run id through the wandb_id arg in sg_logger_params"
                )
            wandb_id = self._get_wandb_id()

    if wandb.run is None:
        run = wandb.init(project=project_name, name=experiment_name, entity=entity, resume=resumed, id=wandb_id, **kwargs)
    else:
        logger.warning(
            "A Weights & Biases run was initialized before initializing `WandBSGLogger`. "
            "This means that `super-gradients` cannot control the run ID to which this session will be logged."
        )
        logger.warning(f"In order to resume this run please call `wandb.init(id={wandb.run.id}, resume='must')` before reinitializing `WandBSGLogger`.")
        run = wandb.run

    if save_code:
        self._save_code_lines()

    self._set_wandb_id(run.id)
    self.save_checkpoints_wandb = save_checkpoints_remote
    self.save_tensorboard_wandb = save_tensorboard_remote
    self.save_logs_wandb = save_logs_remote
    self.save_checkpoint_as_artifact = save_checkpoint_as_artifact