# Copyright 2008-2015 Nokia Networks
# Copyright 2016- Robot Framework Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from contextlib import contextmanager
from pathlib import Path
from robot.errors import DataError
from robot.utils import get_error_message
from .jsonlogger import JsonLogger
from .loggerapi import LoggerApi
from .loglevel import LogLevel
from .xmllogger import LegacyXmlLogger, NullLogger, XmlLogger
[docs]
class OutputFile(LoggerApi):
def __init__(
self,
path: "Path|None",
log_level: LogLevel,
rpa: bool = False,
legacy_output: bool = False,
):
# `self.logger` is replaced with `NullLogger` when flattening.
self.logger = self.real_logger = self._get_logger(path, rpa, legacy_output)
self.is_logged = log_level.is_logged
self.flatten_level = 0
self.errors = []
self._delayed_messages = None
def _get_logger(self, path, rpa, legacy_output):
if not path:
return NullLogger()
try:
file = open(path, "w", encoding="UTF-8")
except Exception:
raise DataError(
f"Opening output file '{path}' failed: {get_error_message()}"
)
if path.suffix.lower() == ".json":
return JsonLogger(file, rpa)
if legacy_output:
return LegacyXmlLogger(file, rpa)
return XmlLogger(file, rpa)
@property
@contextmanager
def delayed_logging(self):
self._delayed_messages, previous = [], self._delayed_messages
try:
yield
finally:
self._release_delayed_messages()
self._delayed_messages = previous
@property
@contextmanager
def delayed_logging_paused(self):
self._release_delayed_messages()
self._delayed_messages = None
try:
yield
finally:
self._delayed_messages = []
def _release_delayed_messages(self):
for msg in self._delayed_messages or ():
self.log_message(msg, no_delay=True)
[docs]
def start_suite(self, data, result):
self.logger.start_suite(result)
[docs]
def end_suite(self, data, result):
self.logger.end_suite(result)
[docs]
def start_test(self, data, result):
self.logger.start_test(result)
[docs]
def end_test(self, data, result):
self.logger.end_test(result)
[docs]
def start_keyword(self, data, result):
self.logger.start_keyword(result)
if result.tags.robot("flatten"):
self.flatten_level += 1
self.logger = NullLogger()
[docs]
def end_keyword(self, data, result):
if self.flatten_level and result.tags.robot("flatten"):
self.flatten_level -= 1
if self.flatten_level == 0:
self.logger = self.real_logger
self.logger.end_keyword(result)
[docs]
def start_for(self, data, result):
self.logger.start_for(result)
[docs]
def end_for(self, data, result):
self.logger.end_for(result)
[docs]
def start_for_iteration(self, data, result):
self.logger.start_for_iteration(result)
[docs]
def end_for_iteration(self, data, result):
self.logger.end_for_iteration(result)
[docs]
def start_while(self, data, result):
self.logger.start_while(result)
[docs]
def end_while(self, data, result):
self.logger.end_while(result)
[docs]
def start_while_iteration(self, data, result):
self.logger.start_while_iteration(result)
[docs]
def end_while_iteration(self, data, result):
self.logger.end_while_iteration(result)
[docs]
def start_if(self, data, result):
self.logger.start_if(result)
[docs]
def end_if(self, data, result):
self.logger.end_if(result)
[docs]
def start_if_branch(self, data, result):
self.logger.start_if_branch(result)
[docs]
def end_if_branch(self, data, result):
self.logger.end_if_branch(result)
[docs]
def start_try(self, data, result):
self.logger.start_try(result)
[docs]
def end_try(self, data, result):
self.logger.end_try(result)
[docs]
def start_try_branch(self, data, result):
self.logger.start_try_branch(result)
[docs]
def end_try_branch(self, data, result):
self.logger.end_try_branch(result)
[docs]
def start_group(self, data, result):
self.logger.start_group(result)
[docs]
def end_group(self, data, result):
self.logger.end_group(result)
[docs]
def start_var(self, data, result):
self.logger.start_var(result)
[docs]
def end_var(self, data, result):
self.logger.end_var(result)
[docs]
def start_break(self, data, result):
self.logger.start_break(result)
[docs]
def end_break(self, data, result):
self.logger.end_break(result)
[docs]
def start_continue(self, data, result):
self.logger.start_continue(result)
[docs]
def end_continue(self, data, result):
self.logger.end_continue(result)
[docs]
def start_return(self, data, result):
self.logger.start_return(result)
[docs]
def end_return(self, data, result):
self.logger.end_return(result)
[docs]
def start_error(self, data, result):
self.logger.start_error(result)
[docs]
def end_error(self, data, result):
self.logger.end_error(result)
[docs]
def log_message(self, message, no_delay=False):
if self.is_logged(message):
if self._delayed_messages is None or no_delay:
# Use the real logger also when flattening.
self.real_logger.message(message)
else:
# Logging is delayed when using timeouts to avoid writing to output
# files being interrupted. There are still problems, though:
# https://github.com/robotframework/robotframework/issues/5417
self._delayed_messages.append(message)
[docs]
def message(self, message):
if message.level in ("WARN", "ERROR"):
self.errors.append(message)
[docs]
def statistics(self, stats):
self.logger.statistics(stats)
[docs]
def close(self):
self.logger.errors(self.errors)
self.logger.close()