Source code for robot.output.outputfile

#  Copyright 2008-2015 Nokia Networks
#  Copyright 2016-     Robot Framework Foundation
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.

from contextlib import contextmanager
from pathlib import Path

from robot.errors import DataError
from robot.utils import get_error_message

from .jsonlogger import JsonLogger
from .loggerapi import LoggerApi
from .loglevel import LogLevel
from .xmllogger import LegacyXmlLogger, NullLogger, XmlLogger


[docs] class OutputFile(LoggerApi): def __init__( self, path: "Path|None", log_level: LogLevel, rpa: bool = False, legacy_output: bool = False, ): # `self.logger` is replaced with `NullLogger` when flattening. self.logger = self.real_logger = self._get_logger(path, rpa, legacy_output) self.is_logged = log_level.is_logged self.flatten_level = 0 self.errors = [] self._delayed_messages = None def _get_logger(self, path, rpa, legacy_output): if not path: return NullLogger() try: file = open(path, "w", encoding="UTF-8") except Exception: raise DataError( f"Opening output file '{path}' failed: {get_error_message()}" ) if path.suffix.lower() == ".json": return JsonLogger(file, rpa) if legacy_output: return LegacyXmlLogger(file, rpa) return XmlLogger(file, rpa) @property @contextmanager def delayed_logging(self): self._delayed_messages, previous = [], self._delayed_messages try: yield finally: self._release_delayed_messages() self._delayed_messages = previous @property @contextmanager def delayed_logging_paused(self): self._release_delayed_messages() self._delayed_messages = None try: yield finally: self._delayed_messages = [] def _release_delayed_messages(self): for msg in self._delayed_messages or (): self.log_message(msg, no_delay=True)
[docs] def start_suite(self, data, result): self.logger.start_suite(result)
[docs] def end_suite(self, data, result): self.logger.end_suite(result)
[docs] def start_test(self, data, result): self.logger.start_test(result)
[docs] def end_test(self, data, result): self.logger.end_test(result)
[docs] def start_keyword(self, data, result): self.logger.start_keyword(result) if result.tags.robot("flatten"): self.flatten_level += 1 self.logger = NullLogger()
[docs] def end_keyword(self, data, result): if self.flatten_level and result.tags.robot("flatten"): self.flatten_level -= 1 if self.flatten_level == 0: self.logger = self.real_logger self.logger.end_keyword(result)
[docs] def start_for(self, data, result): self.logger.start_for(result)
[docs] def end_for(self, data, result): self.logger.end_for(result)
[docs] def start_for_iteration(self, data, result): self.logger.start_for_iteration(result)
[docs] def end_for_iteration(self, data, result): self.logger.end_for_iteration(result)
[docs] def start_while(self, data, result): self.logger.start_while(result)
[docs] def end_while(self, data, result): self.logger.end_while(result)
[docs] def start_while_iteration(self, data, result): self.logger.start_while_iteration(result)
[docs] def end_while_iteration(self, data, result): self.logger.end_while_iteration(result)
[docs] def start_if(self, data, result): self.logger.start_if(result)
[docs] def end_if(self, data, result): self.logger.end_if(result)
[docs] def start_if_branch(self, data, result): self.logger.start_if_branch(result)
[docs] def end_if_branch(self, data, result): self.logger.end_if_branch(result)
[docs] def start_try(self, data, result): self.logger.start_try(result)
[docs] def end_try(self, data, result): self.logger.end_try(result)
[docs] def start_try_branch(self, data, result): self.logger.start_try_branch(result)
[docs] def end_try_branch(self, data, result): self.logger.end_try_branch(result)
[docs] def start_group(self, data, result): self.logger.start_group(result)
[docs] def end_group(self, data, result): self.logger.end_group(result)
[docs] def start_var(self, data, result): self.logger.start_var(result)
[docs] def end_var(self, data, result): self.logger.end_var(result)
[docs] def start_break(self, data, result): self.logger.start_break(result)
[docs] def end_break(self, data, result): self.logger.end_break(result)
[docs] def start_continue(self, data, result): self.logger.start_continue(result)
[docs] def end_continue(self, data, result): self.logger.end_continue(result)
[docs] def start_return(self, data, result): self.logger.start_return(result)
[docs] def end_return(self, data, result): self.logger.end_return(result)
[docs] def start_error(self, data, result): self.logger.start_error(result)
[docs] def end_error(self, data, result): self.logger.end_error(result)
[docs] def log_message(self, message, no_delay=False): if self.is_logged(message): if self._delayed_messages is None or no_delay: # Use the real logger also when flattening. self.real_logger.message(message) else: # Logging is delayed when using timeouts to avoid writing to output # files being interrupted. There are still problems, though: # https://github.com/robotframework/robotframework/issues/5417 self._delayed_messages.append(message)
[docs] def message(self, message): if message.level in ("WARN", "ERROR"): self.errors.append(message)
[docs] def statistics(self, stats): self.logger.statistics(stats)
[docs] def close(self): self.logger.errors(self.errors) self.logger.close()