Source code for fints.parser

import re
import warnings
from collections.abc import Iterable
from enum import Enum

from .formals import (
    Container, DataElementGroupField, SegmentSequence, ValueList,
)
# Ensure that all segment types are loaded (otherwise the subclass find won't see them)
from .segments import (  # noqa
    accounts, auth, bank, base, debit, depot, dialog,
    journal, message, saldo, statement, transfer,
)
from .segments.base import FinTS3Segment

# 
# FinTS 3.0 structure:
#     Message := ( Segment "'" )+
#     Segment := ( DEG "+" )+
#     DEG     := ( ( DE | DEG ) ":")+
#  
#  First DEG in segment is segment header
#  Many DEG (Data Element Group) on Segment level are just DE (Data Element),
#  Recursion DEG -> DEG must be limited, since no other separator characters
#  are available. In general, a second order DEG must have fixed length but
#  may have a variable repeat count if it is at the end of the segment
#
#  Parsing:
#    The message after detokenization/exploding is up to three levels deep:
#    1. level: Sequence of Segments
#    2. level: Sequence of Data Elements or Data Element Groups
#    3. level: Flat sequence of possibly nested Data Element or Data Element Groups
# 
#    On level 2 each item can be either a single item (a single Data Element), or
#    a sequence. A sequence on level 2 can be either a repeated Data Element, or
#    a flat representation of a Data Element Group or repeated Data Element Group.
#    An item on level 3 is always a Data Element, but which Data Element it is depends
#    on which fields have been consumed in the sequence before it.


#: Operate the parser in "robust mode". In this mode, errors during segment parsing
#: will be turned into a FinTSParserWarning and a generic FinTS3Segment (not a subclass)
#: will be constructed. This allows for all syntactically correct FinTS messages to be
#: consumed, even in the presence of errors in this library.
robust_mode = True


class FinTSParserWarning(UserWarning):
    pass


class FinTSParserError(ValueError):
    pass


TOKEN_RE = re.compile(rb"""
                        ^(?:  (?: \? (?P<ECHAR>.) )
                            | (?P<CHAR>[^?:+@']+)
                            | (?P<TOK>[+:'])
                            | (?: @ (?P<BINLEN>[0-9]+) @ )
                         )""", re.X | re.S)


class Token(Enum):
    EOF = 'eof'
    CHAR = 'char'
    BINARY = 'bin'
    PLUS = '+'
    COLON = ':'
    APOSTROPHE = "'"


class ParserState:
    def __init__(self, data: bytes, start=0, end=None, encoding='iso-8859-1'):
        self._token = None
        self._value = None
        self._encoding = encoding
        self._tokenizer = iter(self._tokenize(data, start, end or len(data), encoding))

    def peek(self):
        if not self._token:
            self._token, self._value = next(self._tokenizer)
        return self._token

    def consume(self, token=None):
        self.peek()
        if token and token != self._token:
            raise ValueError
        self._token = None
        return self._value

    @staticmethod
    def _tokenize(data, start, end, encoding):
        pos = start
        unclaimed = []
        last_was = None
        
        while pos < end:
            match = TOKEN_RE.match(data[pos:end])
            if match:
                pos += match.end()
                d = match.groupdict()
                if d['ECHAR'] is not None:
                    unclaimed.append(d['ECHAR'])
                elif d['CHAR'] is not None:
                    unclaimed.append(d['CHAR'])
                else:
                    if unclaimed:
                        if last_was in (Token.BINARY, Token.CHAR):
                            raise ValueError
                        yield Token.CHAR, b''.join(unclaimed).decode(encoding)
                        unclaimed.clear()
                        last_was = Token.CHAR

                    if d['TOK'] is not None:
                        token = Token(d['TOK'].decode('us-ascii'))
                        yield token, d['TOK']
                        last_was = token
                    elif d['BINLEN'] is not None:
                        blen = int(d['BINLEN'].decode('us-ascii'), 10)
                        if last_was in (Token.BINARY, Token.CHAR):
                            raise ValueError
                        yield Token.BINARY, data[pos:pos+blen]
                        pos += blen
                        last_was = Token.BINARY
                    else:
                        raise ValueError
            else:
                raise ValueError

        if unclaimed:
            if last_was in (Token.BINARY, Token.CHAR):
                raise ValueError
            yield Token.CHAR, b''.join(unclaimed).decode(encoding)
            unclaimed.clear()
            last_was = Token.CHAR

        yield Token.EOF, b''


[docs]class FinTS3Parser: """Parser for FinTS/HBCI 3.0 messages """
[docs] def parse_message(self, data: bytes) -> SegmentSequence: """Takes a FinTS 3.0 message as byte array, and returns a parsed segment sequence""" if isinstance(data, bytes): data = self.explode_segments(data) message = SegmentSequence() for segment in data: seg = self.parse_segment(segment) message.segments.append(seg) return message
def parse_segment(self, segment): clazz = FinTS3Segment.find_subclass(segment) try: return self._parse_segment_as_class(clazz, segment) except FinTSParserError as e: if robust_mode: warnings.warn("Ignoring parser error and returning generic object: {}. Turn off robust_mode to see Exception.".format(str(e)), FinTSParserWarning) return self._parse_segment_as_class(FinTS3Segment, segment) else: raise def _parse_segment_as_class(self, clazz, segment): seg = clazz() data = iter(segment) for name, field in seg._fields.items(): repeat = field.count != 1 constructed = isinstance(field, DataElementGroupField) if not repeat: try: val = next(data) except StopIteration: if field.required: raise FinTSParserError("Required field {}.{} was not present".format(seg.__class__.__name__, name)) break try: if not constructed: setattr(seg, name, val) else: deg = self.parse_deg_noniter(field.type, val, field.required) setattr(seg, name, deg) except ValueError as e: raise FinTSParserError("Wrong input when setting {}.{}".format(seg.__class__.__name__, name)) from e else: i = 0 while True: try: val = next(data) except StopIteration: break try: if not constructed: getattr(seg, name)[i] = val else: deg = self.parse_deg_noniter(field.type, val, field.required) getattr(seg, name)[i] = deg except ValueError as e: raise FinTSParserError("Wrong input when setting {}.{}".format(seg.__class__.__name__, name)) from e i = i + 1 if field.count is not None and i >= field.count: break if field.max_count is not None and i >= field.max_count: break seg._additional_data = list(data) return seg def parse_deg_noniter(self, clazz, data, required): if not isinstance(data, Iterable) or isinstance(data, (str, bytes)): data = [data] data_i = iter(data) retval = self.parse_deg(clazz, data_i, required) remainder = list(data_i) if remainder: raise FinTSParserError("Unparsed data {!r} after parsing {!r}".format(remainder, clazz)) return retval def parse_deg(self, clazz, data_i, required=True): retval = clazz() for number, (name, field) in enumerate(retval._fields.items()): repeat = field.count != 1 constructed = isinstance(field, DataElementGroupField) is_last = number == len(retval._fields)-1 if not repeat: try: if not constructed: try: setattr(retval, name, next(data_i)) except StopIteration: if required and field.required: raise FinTSParserError("Required field {}.{} was not present".format(retval.__class__.__name__, name)) break else: deg = self.parse_deg(field.type, data_i, required and field.required) setattr(retval, name, deg) except ValueError as e: raise FinTSParserError("Wrong input when setting {}.{}".format(retval.__class__.__name__, name)) from e else: i = 0 while True: try: if not constructed: try: getattr(retval, name)[i] = next(data_i) except StopIteration: break else: require_last = (field.max_count is None) if is_last else True deg = self.parse_deg(field.type, data_i, require_last and required and field.required) getattr(retval, name)[i] = deg except ValueError as e: raise FinTSParserError("Wrong input when setting {}.{}".format(retval.__class__.__name__, name)) from e i = i + 1 if field.count is not None and i >= field.count: break if field.max_count is not None and i >= field.max_count: break return retval @staticmethod def explode_segments(data: bytes, start=0, end=None): segments = [] parser = ParserState(data, start, end) while parser.peek() != Token.EOF: segment = [] while parser.peek() not in (Token.APOSTROPHE, Token.EOF): data = None deg = [] while parser.peek() in (Token.BINARY, Token.CHAR, Token.COLON): if parser.peek() in (Token.BINARY, Token.CHAR): data = parser.consume() elif parser.peek() == Token.COLON: deg.append(data) data = None parser.consume(Token.COLON) if data and deg: deg.append(data) if deg: data = deg segment.append(data) if parser.peek() == Token.PLUS: parser.consume(Token.PLUS) parser.consume(Token.APOSTROPHE) segments.append(segment) parser.consume(Token.EOF) return segments
[docs]class FinTS3Serializer: """Serializer for FinTS/HBCI 3.0 messages """
[docs] def serialize_message(self, message: SegmentSequence) -> bytes: """Serialize a message (as SegmentSequence, list of FinTS3Segment, or FinTS3Segment) into a byte array""" if isinstance(message, FinTS3Segment): message = SegmentSequence([message]) if isinstance(message, (list, tuple, Iterable)): message = SegmentSequence(list(message)) result = [] for segment in message.segments: result.append(self.serialize_segment(segment)) return self.implode_segments(result)
def serialize_segment(self, segment): seg = [] filler = [] for name, field in segment._fields.items(): repeat = field.count != 1 constructed = isinstance(field, DataElementGroupField) val = getattr(segment, name) empty = False if not field.required: if isinstance(val, Container): if val.is_unset(): empty = True elif isinstance(val, ValueList): if len(val) == 0: empty = True elif val is None: empty = True if empty: filler.append(None) continue else: if filler: seg.extend(filler) filler.clear() if not constructed: if repeat: seg.extend(field.render(val) for val in getattr(segment, name)) else: seg.append(field.render(getattr(segment, name))) else: if repeat: for val in getattr(segment, name): seg.append(self.serialize_deg(val)) else: seg.append(self.serialize_deg(getattr(segment, name), allow_skip=True)) if segment._additional_data: seg.extend(segment._additional_data) return seg def serialize_deg(self, deg, allow_skip=False): result = [] filler = [] for name,field in deg._fields.items(): repeat = field.count != 1 constructed = isinstance(field, DataElementGroupField) val = getattr(deg, name) empty = False if field.count == 1 and not field.required: if isinstance(val, Container): if val.is_unset(): empty = True elif isinstance(val, ValueList): if len(val) == 0: empty = True elif val is None: empty = True if empty: if allow_skip: filler.append(None) else: result.append(None) continue else: if filler: result.extend(filler) filler.clear() if not constructed: if repeat: result.extend(field.render(val) for val in getattr(deg, name)) else: result.append(field.render(getattr(deg, name))) else: if repeat: for val in getattr(deg, name): result.extend(self.serialize_deg(val)) else: result.extend(self.serialize_deg(getattr(deg, name))) return result @staticmethod def implode_segments(message: list): level1 = [] for segment in message: level2 = [] for deg in segment: if isinstance(deg, (list, tuple)): highest_index = max(((i+1) for (i, e) in enumerate(deg) if e != b'' and e is not None), default=0) level2.append( b":".join(FinTS3Serializer.escape_value(de) for de in deg[:highest_index]) ) else: level2.append(FinTS3Serializer.escape_value(deg)) level1.append(b"+".join(level2)) return b"'".join(level1) + b"'" @staticmethod def escape_value(val): if isinstance(val, str): return re.sub(r"([+:'@?])", r"?\1", val).encode('iso-8859-1') elif isinstance(val, bytes): return "@{}@".format(len(val)).encode('us-ascii') + val elif val is None: return b'' else: raise TypeError("Can only escape str, bytes and None")